| Index: lucene/CHANGES.txt |
| =================================================================== |
| --- lucene/CHANGES.txt (revision 1214418) |
| +++ lucene/CHANGES.txt (working copy) |
| @@ -751,6 +751,12 @@ |
| where they would create invalid offsets in some situations, leading to problems |
| in highlighting. (Max Beutel via Robert Muir) |
| |
| +* LUCENE-3639: TopDocs.merge was incorrectly setting TopDocs.maxScore to |
| + Float.MIN_VALUE when it should be Float.NaN, when there were 0 |
| + hits. Improved age calculation in SearcherLifetimeManager, to have |
| + double precision and to compute age to be how long ago the searcher |
| + was replaced with a new searcher (Mike McCandless) |
| + |
| Documentation |
| |
| * LUCENE-3597: Fixed incorrect grouping documentation. (Martijn van Groningen, Robert Muir) |
| Index: lucene/src/test/org/apache/lucene/search/TestShardSearching.java |
| =================================================================== |
| --- lucene/src/test/org/apache/lucene/search/TestShardSearching.java (revision 0) |
| +++ lucene/src/test/org/apache/lucene/search/TestShardSearching.java (working copy) |
| @@ -0,0 +1,393 @@ |
| +package org.apache.lucene.search; |
| + |
| +/** |
| + * Licensed to the Apache Software Foundation (ASF) under one or more |
| + * contributor license agreements. See the NOTICE file distributed with |
| + * this work for additional information regarding copyright ownership. |
| + * The ASF licenses this file to You under the Apache License, Version 2.0 |
| + * (the "License"); you may not use this file except in compliance with |
| + * the License. You may obtain a copy of the License at |
| + * |
| + * http://www.apache.org/licenses/LICENSE-2.0 |
| + * |
| + * Unless required by applicable law or agreed to in writing, software |
| + * distributed under the License is distributed on an "AS IS" BASIS, |
| + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| + * See the License for the specific language governing permissions and |
| + * limitations under the License. |
| + */ |
| + |
| +import java.io.IOException; |
| +import java.util.ArrayList; |
| +import java.util.Collections; |
| +import java.util.List; |
| + |
| +import org.apache.lucene.index.IndexReader; |
| +import org.apache.lucene.index.MultiFields; |
| +import org.apache.lucene.index.MultiReader; |
| +import org.apache.lucene.index.Term; |
| +import org.apache.lucene.index.TermsEnum; |
| +import org.apache.lucene.util.BytesRef; |
| +import org.apache.lucene.util.LuceneTestCase.UseNoMemoryExpensiveCodec; |
| +import org.apache.lucene.util._TestUtil; |
| + |
| +// TODO |
| +// - other queries besides PrefixQuery & TermQuery (but: |
| +// FuzzyQ will be problematic... the top N terms it |
| +// takes means results will differ) |
| +// - NRQ/F |
| +// - BQ, negated clauses, negated prefix clauses |
| +// - test pulling docs in 2nd round trip... |
| +// - filter too |
| + |
| +@UseNoMemoryExpensiveCodec |
| +public class TestShardSearching extends ShardSearchingTestBase { |
| + |
| + private static class PreviousSearchState { |
| + public final long searchTimeNanos; |
| + public final long[] versions; |
| + public final ScoreDoc searchAfterLocal; |
| + public final ScoreDoc searchAfterShard; |
| + public final Sort sort; |
| + public final Query query; |
| + public final int numHitsPaged; |
| + |
| + public PreviousSearchState(Query query, Sort sort, ScoreDoc searchAfterLocal, ScoreDoc searchAfterShard, long[] versions, int numHitsPaged) { |
| + this.versions = versions.clone(); |
| + this.searchAfterLocal = searchAfterLocal; |
| + this.searchAfterShard = searchAfterShard; |
| + this.sort = sort; |
| + this.query = query; |
| + this.numHitsPaged = numHitsPaged; |
| + searchTimeNanos = System.nanoTime(); |
| + } |
| + } |
| + |
| + public void testSimple() throws Exception { |
| + final int numNodes = _TestUtil.nextInt(random, 1, 10); |
| + |
| + final double runTimeSec = 5.0 * RANDOM_MULTIPLIER; |
| + |
| + final int minDocsToMakeTerms = _TestUtil.nextInt(random, 5, 20); |
| + |
| + final int maxSearcherAgeSeconds = _TestUtil.nextInt(random, 1, 4); |
| + |
| + if (VERBOSE) { |
| + System.out.println("TEST: numNodes=" + numNodes + " runTimeSec=" + runTimeSec + " maxSearcherAgeSeconds=" + maxSearcherAgeSeconds); |
| + } |
| + |
| + start(_TestUtil.getTempDir("TestShardSearching").toString(), |
| + numNodes, |
| + runTimeSec, |
| + maxSearcherAgeSeconds |
| + ); |
| + |
| + final List<PreviousSearchState> priorSearches = new ArrayList<PreviousSearchState>(); |
| + List<BytesRef> terms = null; |
| + while (System.nanoTime() < endTimeNanos) { |
| + |
| + final boolean doFollowon = priorSearches.size() > 0 && random.nextInt(7) == 1; |
| + |
| + // Pick a random node; we will run the query on this node: |
| + final int myNodeID = random.nextInt(numNodes); |
| + |
| + final NodeState.ShardIndexSearcher localShardSearcher; |
| + |
| + final PreviousSearchState prevSearchState; |
| + |
| + if (doFollowon) { |
| + // Pretend user issued a followon query: |
| + prevSearchState = priorSearches.get(random.nextInt(priorSearches.size())); |
| + |
| + if (VERBOSE) { |
| + System.out.println("\nTEST: follow-on query age=" + ((System.nanoTime() - prevSearchState.searchTimeNanos)/1000000000.0)); |
| + } |
| + |
| + try { |
| + localShardSearcher = nodes[myNodeID].acquire(prevSearchState.versions); |
| + } catch (SearcherExpiredException see) { |
| + // Expected, sometimes; in a "real" app we would |
| + // either forward this error to the user ("too |
| + // much time has passed; please re-run your |
| + // search") or sneakily just switch to newest |
| + // searcher w/o telling them... |
| + if (VERBOSE) { |
| + System.out.println(" searcher expired during local shard searcher init: " + see); |
| + } |
| + priorSearches.remove(prevSearchState); |
| + continue; |
| + } |
| + } else { |
| + if (VERBOSE) { |
| + System.out.println("\nTEST: fresh query"); |
| + } |
| + // Do fresh query: |
| + localShardSearcher = nodes[myNodeID].acquire(); |
| + prevSearchState = null; |
| + } |
| + |
| + final IndexReader[] subs = new IndexReader[numNodes]; |
| + |
| + PreviousSearchState searchState = null; |
| + |
| + try { |
| + |
| + // Mock: now make a single reader (MultiReader) from all node |
| + // searchers. In a real shard env you can't do this... we |
| + // do it to confirm results from the shard searcher |
| + // are correct: |
| + int docCount = 0; |
| + try { |
| + for(int nodeID=0;nodeID<numNodes;nodeID++) { |
| + final long subVersion = localShardSearcher.nodeVersions[nodeID]; |
| + final IndexSearcher sub = nodes[nodeID].searchers.acquire(subVersion); |
| + if (sub == null) { |
| + nodeID--; |
| + while(nodeID >= 0) { |
| + subs[nodeID].decRef(); |
| + subs[nodeID] = null; |
| + nodeID--; |
| + } |
| + throw new SearcherExpiredException("nodeID=" + nodeID + " version=" + subVersion); |
| + } |
| + subs[nodeID] = sub.getIndexReader(); |
| + docCount += subs[nodeID].maxDoc(); |
| + } |
| + } catch (SearcherExpiredException see) { |
| + // Expected |
| + if (VERBOSE) { |
| + System.out.println(" searcher expired during mock reader init: " + see); |
| + } |
| + continue; |
| + } |
| + |
| + final IndexReader mockReader = new MultiReader(subs); |
| + final IndexSearcher mockSearcher = new IndexSearcher(mockReader); |
| + |
| + Query query; |
| + Sort sort; |
| + |
| + if (prevSearchState != null) { |
| + query = prevSearchState.query; |
| + sort = prevSearchState.sort; |
| + } else { |
| + if (terms == null && docCount > minDocsToMakeTerms) { |
| + // TODO: try to "focus" on high freq terms sometimes too |
| + // TODO: maybe also periodically reset the terms...? |
| + final TermsEnum termsEnum = MultiFields.getTerms(mockReader, "body").iterator(null); |
| + terms = new ArrayList<BytesRef>(); |
| + while(termsEnum.next() != null) { |
| + terms.add(BytesRef.deepCopyOf(termsEnum.term())); |
| + } |
| + if (VERBOSE) { |
| + System.out.println("TEST: init terms: " + terms.size() + " terms"); |
| + } |
| + if (terms.size() == 0) { |
| + terms = null; |
| + } |
| + } |
| + |
| + if (VERBOSE) { |
| + System.out.println(" maxDoc=" + mockReader.maxDoc()); |
| + } |
| + |
| + if (terms != null) { |
| + if (random.nextBoolean()) { |
| + query = new TermQuery(new Term("body", terms.get(random.nextInt(terms.size())))); |
| + } else { |
| + final String t = terms.get(random.nextInt(terms.size())).utf8ToString(); |
| + final String prefix; |
| + if (t.length() <= 1) { |
| + prefix = t; |
| + } else { |
| + prefix = t.substring(0, _TestUtil.nextInt(random, 1, 2)); |
| + } |
| + query = new PrefixQuery(new Term("body", prefix)); |
| + } |
| + |
| + if (random.nextBoolean()) { |
| + sort = null; |
| + } else { |
| + // TODO: sort by more than 1 field |
| + final int what = random.nextInt(3); |
| + if (what == 0) { |
| + sort = new Sort(SortField.FIELD_SCORE); |
| + } else if (what == 1) { |
| + // TODO: this sort doesn't merge |
| + // correctly... it's tricky because you |
| + // could have > 2.1B docs across all shards: |
| + //sort = new Sort(SortField.FIELD_DOC); |
| + sort = null; |
| + } else if (what == 2) { |
| + sort = new Sort(new SortField[] {new SortField("docid", SortField.Type.INT, random.nextBoolean())}); |
| + } else { |
| + sort = new Sort(new SortField[] {new SortField("title", SortField.Type.STRING, random.nextBoolean())}); |
| + } |
| + } |
| + } else { |
| + query = null; |
| + sort = null; |
| + } |
| + } |
| + |
| + if (query != null) { |
| + |
| + try { |
| + searchState = assertSame(mockSearcher, localShardSearcher, query, sort, prevSearchState); |
| + } catch (SearcherExpiredException see) { |
| + // Expected; in a "real" app we would |
| + // either forward this error to the user ("too |
| + // much time has passed; please re-run your |
| + // search") or sneakily just switch to newest |
| + // searcher w/o telling them... |
| + if (VERBOSE) { |
| + System.out.println(" searcher expired during search: " + see); |
| + see.printStackTrace(System.out); |
| + } |
| + assert prevSearchState != null; |
| + priorSearches.remove(prevSearchState); |
| + } |
| + } |
| + } finally { |
| + nodes[myNodeID].release(localShardSearcher); |
| + for(IndexReader sub : subs) { |
| + if (sub != null) { |
| + sub.decRef(); |
| + } |
| + } |
| + } |
| + |
| + if (searchState != null && searchState.searchAfterLocal != null && random.nextInt(5) == 3) { |
| + priorSearches.add(searchState); |
| + if (priorSearches.size() > 200) { |
| + Collections.shuffle(priorSearches, random); |
| + priorSearches.subList(100, priorSearches.size()).clear(); |
| + } |
| + } |
| + } |
| + |
| + finish(); |
| + } |
| + |
| + private PreviousSearchState assertSame(IndexSearcher mockSearcher, NodeState.ShardIndexSearcher shardSearcher, Query q, Sort sort, PreviousSearchState state) throws IOException { |
| + |
| + int numHits = _TestUtil.nextInt(random, 1, 100); |
| + if (state != null && state.searchAfterLocal == null) { |
| + // In addition to what we last searched: |
| + numHits += state.numHitsPaged; |
| + } |
| + |
| + if (VERBOSE) { |
| + System.out.println("TEST: query=" + q + " sort=" + sort + " numHits=" + numHits); |
| + if (state != null) { |
| + System.out.println(" prev: searchAfterLocal=" + state.searchAfterLocal + " searchAfterShard=" + state.searchAfterShard + " numHitsPaged=" + state.numHitsPaged); |
| + } |
| + } |
| + |
| + // Single (mock local) searcher: |
| + final TopDocs hits; |
| + if (sort == null) { |
| + if (state != null && state.searchAfterLocal != null) { |
| + hits = mockSearcher.searchAfter(state.searchAfterLocal, q, numHits); |
| + } else { |
| + hits = mockSearcher.search(q, numHits); |
| + } |
| + } else { |
| + hits = mockSearcher.search(q, numHits, sort); |
| + } |
| + |
| + // Shard searcher |
| + final TopDocs shardHits; |
| + if (sort == null) { |
| + if (state != null && state.searchAfterShard != null) { |
| + shardHits = shardSearcher.searchAfter(state.searchAfterShard, q, numHits); |
| + } else { |
| + shardHits = shardSearcher.search(q, numHits); |
| + } |
| + } else { |
| + shardHits = shardSearcher.search(q, numHits, sort); |
| + } |
| + |
| + final int numNodes = shardSearcher.nodeVersions.length; |
| + int[] base = new int[numNodes]; |
| + final IndexReader[] subs = mockSearcher.getIndexReader().getSequentialSubReaders(); |
| + assertEquals(numNodes, subs.length); |
| + |
| + int docCount = 0; |
| + for(int nodeID=0;nodeID<numNodes;nodeID++) { |
| + base[nodeID] = docCount; |
| + docCount += subs[nodeID].maxDoc(); |
| + } |
| + |
| + if (VERBOSE) { |
| + /* |
| + for(int shardID=0;shardID<shardSearchers.length;shardID++) { |
| + System.out.println(" shard=" + shardID + " maxDoc=" + shardSearchers[shardID].searcher.getIndexReader().maxDoc()); |
| + } |
| + */ |
| + System.out.println(" single searcher: " + hits.totalHits + " totalHits maxScore=" + hits.getMaxScore()); |
| + for(int i=0;i<hits.scoreDocs.length;i++) { |
| + final ScoreDoc sd = hits.scoreDocs[i]; |
| + System.out.println(" doc=" + sd.doc + " score=" + sd.score); |
| + } |
| + System.out.println(" shard searcher: " + shardHits.totalHits + " totalHits maxScore=" + shardHits.getMaxScore()); |
| + for(int i=0;i<shardHits.scoreDocs.length;i++) { |
| + final ScoreDoc sd = shardHits.scoreDocs[i]; |
| + System.out.println(" doc=" + sd.doc + " (rebased: " + (sd.doc + base[sd.shardIndex]) + ") score=" + sd.score + " shard=" + sd.shardIndex); |
| + } |
| + } |
| + |
| + int numHitsPaged; |
| + if (state != null && state.searchAfterLocal != null) { |
| + numHitsPaged = hits.scoreDocs.length; |
| + if (state != null) { |
| + numHitsPaged += state.numHitsPaged; |
| + } |
| + } else { |
| + numHitsPaged = hits.scoreDocs.length; |
| + } |
| + |
| + final boolean moreHits; |
| + |
| + final ScoreDoc bottomHit; |
| + final ScoreDoc bottomHitShards; |
| + |
| + if (numHitsPaged < hits.totalHits) { |
| + // More hits to page through |
| + moreHits = true; |
| + if (sort == null) { |
| + bottomHit = hits.scoreDocs[hits.scoreDocs.length-1]; |
| + final ScoreDoc sd = shardHits.scoreDocs[shardHits.scoreDocs.length-1]; |
| + // Must copy because below we rebase: |
| + bottomHitShards = new ScoreDoc(sd.doc, sd.score, sd.shardIndex); |
| + if (VERBOSE) { |
| + System.out.println(" save bottomHit=" + bottomHit); |
| + } |
| + } else { |
| + bottomHit = null; |
| + bottomHitShards = null; |
| + } |
| + |
| + } else { |
| + assertEquals(hits.totalHits, numHitsPaged); |
| + bottomHit = null; |
| + bottomHitShards = null; |
| + moreHits = false; |
| + } |
| + |
| + // Must rebase so assertEquals passes: |
| + for(int hitID=0;hitID<shardHits.scoreDocs.length;hitID++) { |
| + final ScoreDoc sd = shardHits.scoreDocs[hitID]; |
| + sd.doc += base[sd.shardIndex]; |
| + } |
| + |
| + _TestUtil.assertEquals(hits, shardHits); |
| + |
| + if (moreHits) { |
| + // Return a continuation: |
| + return new PreviousSearchState(q, sort, bottomHit, bottomHitShards, shardSearcher.nodeVersions, numHitsPaged); |
| + } else { |
| + return null; |
| + } |
| + } |
| +} |
| |
| Property changes on: lucene/src/test/org/apache/lucene/search/TestShardSearching.java |
| ___________________________________________________________________ |
| Added: svn:eol-style |
| ## -0,0 +1 ## |
| +native |
| Index: lucene/src/java/org/apache/lucene/search/SearcherLifetimeManager.java |
| =================================================================== |
| --- lucene/src/java/org/apache/lucene/search/SearcherLifetimeManager.java (revision 1214418) |
| +++ lucene/src/java/org/apache/lucene/search/SearcherLifetimeManager.java (working copy) |
| @@ -23,7 +23,6 @@ |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.concurrent.ConcurrentHashMap; |
| -import java.util.concurrent.TimeUnit; |
| |
| import org.apache.lucene.search.NRTManager; // javadocs |
| import org.apache.lucene.index.IndexReader; // javadocs |
| @@ -101,9 +100,11 @@ |
| |
| public class SearcherLifetimeManager implements Closeable { |
| |
| + static final double NANOS_PER_SEC = 1000000000.0; |
| + |
| private static class SearcherTracker implements Comparable<SearcherTracker>, Closeable { |
| public final IndexSearcher searcher; |
| - public final long recordTimeSec; |
| + public final double recordTimeSec; |
| public final long version; |
| |
| public SearcherTracker(IndexSearcher searcher) { |
| @@ -112,7 +113,7 @@ |
| searcher.getIndexReader().incRef(); |
| // Use nanoTime not currentTimeMillis since it [in |
| // theory] reduces risk from clock shift |
| - recordTimeSec = TimeUnit.NANOSECONDS.toSeconds(System.nanoTime()); |
| + recordTimeSec = System.nanoTime() / NANOS_PER_SEC; |
| } |
| |
| // Newer searchers are sort before older ones: |
| @@ -170,6 +171,7 @@ |
| final long version = searcher.getIndexReader().getVersion(); |
| SearcherTracker tracker = searchers.get(version); |
| if (tracker == null) { |
| + //System.out.println("RECORD version=" + version + " ms=" + System.currentTimeMillis()); |
| tracker = new SearcherTracker(searcher); |
| if (searchers.putIfAbsent(version, tracker) != null) { |
| // Another thread beat us -- must decRef to undo |
| @@ -217,29 +219,28 @@ |
| /** See {@link #prune}. */ |
| public interface Pruner { |
| /** Return true if this searcher should be removed. |
| - * @param ageSec how long ago this searcher was |
| - * recorded vs the most recently recorded |
| - * searcher |
| + * @param ageSec how much time has passed since this |
| + * searcher was the current (live) searcher |
| * @param searcher Searcher |
| **/ |
| - public boolean doPrune(int ageSec, IndexSearcher searcher); |
| + public boolean doPrune(double ageSec, IndexSearcher searcher); |
| } |
| |
| /** Simple pruner that drops any searcher older by |
| * more than the specified seconds, than the newest |
| * searcher. */ |
| public final static class PruneByAge implements Pruner { |
| - private final int maxAgeSec; |
| + private final double maxAgeSec; |
| |
| - public PruneByAge(int maxAgeSec) { |
| - if (maxAgeSec < 1) { |
| + public PruneByAge(double maxAgeSec) { |
| + if (maxAgeSec < 0) { |
| throw new IllegalArgumentException("maxAgeSec must be > 0 (got " + maxAgeSec + ")"); |
| } |
| this.maxAgeSec = maxAgeSec; |
| } |
| |
| @Override |
| - public boolean doPrune(int ageSec, IndexSearcher searcher) { |
| + public boolean doPrune(double ageSec, IndexSearcher searcher) { |
| return ageSec > maxAgeSec; |
| } |
| } |
| @@ -261,14 +262,25 @@ |
| trackers.add(tracker); |
| } |
| Collections.sort(trackers); |
| - final long newestSec = trackers.isEmpty() ? 0L : trackers.get(0).recordTimeSec; |
| + double lastRecordTimeSec = 0.0; |
| + final double now = System.nanoTime()/NANOS_PER_SEC; |
| for (SearcherTracker tracker: trackers) { |
| - final int ageSec = (int) (newestSec - tracker.recordTimeSec); |
| - assert ageSec >= 0; |
| + final double ageSec; |
| + if (lastRecordTimeSec == 0.0) { |
| + ageSec = 0.0; |
| + } else { |
| + ageSec = now - lastRecordTimeSec; |
| + } |
| + // First tracker is always age 0.0 sec, since it's |
| + // still "live"; second tracker's age (= seconds since |
| + // it was "live") is now minus first tracker's |
| + // recordTime, etc: |
| if (pruner.doPrune(ageSec, tracker.searcher)) { |
| + //System.out.println("PRUNE version=" + tracker.version + " age=" + ageSec + " ms=" + System.currentTimeMillis()); |
| searchers.remove(tracker.version); |
| tracker.close(); |
| } |
| + lastRecordTimeSec = tracker.recordTimeSec; |
| } |
| } |
| |
| Index: lucene/src/java/org/apache/lucene/search/TopDocs.java |
| =================================================================== |
| --- lucene/src/java/org/apache/lucene/search/TopDocs.java (revision 1214418) |
| +++ lucene/src/java/org/apache/lucene/search/TopDocs.java (working copy) |
| @@ -216,8 +216,10 @@ |
| float maxScore = Float.MIN_VALUE; |
| for(int shardIDX=0;shardIDX<shardHits.length;shardIDX++) { |
| final TopDocs shard = shardHits[shardIDX]; |
| + // totalHits can be non-zero even if no hits were |
| + // collected, when searchAfter was used: |
| + totalHitCount += shard.totalHits; |
| if (shard.scoreDocs != null && shard.scoreDocs.length > 0) { |
| - totalHitCount += shard.totalHits; |
| availHitCount += shard.scoreDocs.length; |
| queue.add(new ShardRef(shardIDX)); |
| maxScore = Math.max(maxScore, shard.getMaxScore()); |
| @@ -225,6 +227,10 @@ |
| } |
| } |
| |
| + if (availHitCount == 0) { |
| + maxScore = Float.NaN; |
| + } |
| + |
| final ScoreDoc[] hits = new ScoreDoc[Math.min(topN, availHitCount)]; |
| |
| int hitUpto = 0; |
| Index: lucene/src/java/org/apache/lucene/search/ScoreDoc.java |
| =================================================================== |
| --- lucene/src/java/org/apache/lucene/search/ScoreDoc.java (revision 1214418) |
| +++ lucene/src/java/org/apache/lucene/search/ScoreDoc.java (working copy) |
| @@ -46,6 +46,6 @@ |
| // A convenience method for debugging. |
| @Override |
| public String toString() { |
| - return "doc=" + doc + " score=" + score; |
| + return "doc=" + doc + " score=" + score + " shardIndex=" + shardIndex; |
| } |
| } |
| Index: lucene/src/test-framework/java/org/apache/lucene/search/ShardSearchingTestBase.java |
| =================================================================== |
| --- lucene/src/test-framework/java/org/apache/lucene/search/ShardSearchingTestBase.java (revision 0) |
| +++ lucene/src/test-framework/java/org/apache/lucene/search/ShardSearchingTestBase.java (working copy) |
| @@ -0,0 +1,575 @@ |
| +package org.apache.lucene.search; |
| + |
| +/** |
| + * Licensed to the Apache Software Foundation (ASF) under one or more |
| + * contributor license agreements. See the NOTICE file distributed with |
| + * this work for additional information regarding copyright ownership. |
| + * The ASF licenses this file to You under the Apache License, Version 2.0 |
| + * (the "License"); you may not use this file except in compliance with |
| + * the License. You may obtain a copy of the License at |
| + * |
| + * http://www.apache.org/licenses/LICENSE-2.0 |
| + * |
| + * Unless required by applicable law or agreed to in writing, software |
| + * distributed under the License is distributed on an "AS IS" BASIS, |
| + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| + * See the License for the specific language governing permissions and |
| + * limitations under the License. |
| + */ |
| + |
| +import java.io.Closeable; |
| +import java.io.File; |
| +import java.io.IOException; |
| +import java.util.HashMap; |
| +import java.util.HashSet; |
| +import java.util.Map; |
| +import java.util.Random; |
| +import java.util.Set; |
| +import java.util.concurrent.ConcurrentHashMap; |
| + |
| +import org.apache.lucene.index.IndexReader; |
| +import org.apache.lucene.index.IndexWriter; |
| +import org.apache.lucene.index.IndexWriterConfig; |
| +import org.apache.lucene.index.Term; |
| +import org.apache.lucene.analysis.MockAnalyzer; |
| +import org.apache.lucene.store.Directory; |
| +import org.apache.lucene.util.LineFileDocs; |
| +import org.apache.lucene.util.LuceneTestCase; |
| +import org.apache.lucene.util.TermContext; |
| + |
| +// TODO |
| +// - doc blocks? so we can test joins/grouping... |
| +// - controlled consistency (NRTMgr) |
| + |
| +public abstract class ShardSearchingTestBase extends LuceneTestCase { |
| + |
| + // TODO: maybe SLM should throw this instead of returning null... |
| + public static class SearcherExpiredException extends RuntimeException { |
| + public SearcherExpiredException(String message) { |
| + super(message); |
| + } |
| + } |
| + |
| + private static class FieldAndShardVersion { |
| + private final long version; |
| + private final int nodeID; |
| + private final String field; |
| + |
| + public FieldAndShardVersion(int nodeID, long version, String field) { |
| + this.nodeID = nodeID; |
| + this.version = version; |
| + this.field = field; |
| + } |
| + |
| + @Override |
| + public int hashCode() { |
| + return (int) (version * nodeID + field.hashCode()); |
| + } |
| + |
| + @Override |
| + public boolean equals(Object _other) { |
| + if (!(_other instanceof FieldAndShardVersion)) { |
| + return false; |
| + } |
| + |
| + final FieldAndShardVersion other = (FieldAndShardVersion) _other; |
| + |
| + return field.equals(other.field) && version == other.version && nodeID == other.nodeID; |
| + } |
| + |
| + @Override |
| + public String toString() { |
| + return "FieldAndShardVersion(field=" + field + " nodeID=" + nodeID + " version=" + version+ ")"; |
| + } |
| + } |
| + |
| + private static class TermAndShardVersion { |
| + private final long version; |
| + private final int nodeID; |
| + private final Term term; |
| + |
| + public TermAndShardVersion(int nodeID, long version, Term term) { |
| + this.nodeID = nodeID; |
| + this.version = version; |
| + this.term = term; |
| + } |
| + |
| + @Override |
| + public int hashCode() { |
| + return (int) (version * nodeID + term.hashCode()); |
| + } |
| + |
| + @Override |
| + public boolean equals(Object _other) { |
| + if (!(_other instanceof TermAndShardVersion)) { |
| + return false; |
| + } |
| + |
| + final TermAndShardVersion other = (TermAndShardVersion) _other; |
| + |
| + return term.equals(other.term) && version == other.version && nodeID == other.nodeID; |
| + } |
| + } |
| + |
| + // We share collection stats for these fields on each node |
| + // reopen: |
| + private final String[] fieldsToShare = new String[] {"body", "title"}; |
| + |
| + // Called by one node once it has reopened, to notify all |
| + // other nodes. This is just a mock (since it goes and |
| + // directly updates all other nodes, in RAM)... in a real |
| + // env this would hit the wire, sending version & |
| + // collection stats to all other nodes: |
| + void broadcastNodeReopen(int nodeID, long version, IndexSearcher newSearcher) throws IOException { |
| + |
| + if (VERBOSE) { |
| + System.out.println("REOPEN: nodeID=" + nodeID + " version=" + version + " maxDoc=" + newSearcher.getIndexReader().maxDoc()); |
| + } |
| + |
| + // Broadcast new collection stats for this node to all |
| + // other nodes: |
| + for(String field : fieldsToShare) { |
| + final CollectionStatistics stats = newSearcher.collectionStatistics(field); |
| + for (NodeState node : nodes) { |
| + // Don't put my own collection stats into the cache; |
| + // we pull locally: |
| + if (node.myNodeID != nodeID) { |
| + node.collectionStatsCache.put(new FieldAndShardVersion(nodeID, version, field), stats); |
| + } |
| + } |
| + } |
| + for (NodeState node : nodes) { |
| + node.updateNodeVersion(nodeID, version); |
| + } |
| + } |
| + |
| + // MOCK: in a real env you have to hit the wire |
| + // (send this query to all remote nodes |
| + // concurrently): |
| + TopDocs searchNode(int nodeID, long[] nodeVersions, Query q, Sort sort, int numHits, ScoreDoc searchAfter) throws IOException { |
| + final NodeState.ShardIndexSearcher s = nodes[nodeID].acquire(nodeVersions); |
| + try { |
| + if (sort == null) { |
| + if (searchAfter != null) { |
| + return s.localSearchAfter(searchAfter, q, numHits); |
| + } else { |
| + return s.localSearch(q, numHits); |
| + } |
| + } else { |
| + assert searchAfter == null; // not supported yet |
| + return s.localSearch(q, numHits, sort); |
| + } |
| + } finally { |
| + nodes[nodeID].release(s); |
| + } |
| + } |
| + |
| + // Mock: in a real env, this would hit the wire and get |
| + // term stats from remote node |
| + Map<Term,TermStatistics> getNodeTermStats(Set<Term> terms, int nodeID, long version) throws IOException { |
| + final NodeState node = nodes[nodeID]; |
| + final Map<Term,TermStatistics> stats = new HashMap<Term,TermStatistics>(); |
| + final IndexSearcher s = node.searchers.acquire(version); |
| + if (s == null) { |
| + throw new SearcherExpiredException("node=" + nodeID + " version=" + version); |
| + } |
| + try { |
| + for(Term term : terms) { |
| + final TermContext termContext = TermContext.build(s.getIndexReader().getTopReaderContext(), term, false); |
| + stats.put(term, s.termStatistics(term, termContext)); |
| + } |
| + } finally { |
| + node.searchers.release(s); |
| + } |
| + return stats; |
| + } |
| + |
| + protected final class NodeState implements Closeable { |
| + public final Directory dir; |
| + public final IndexWriter writer; |
| + public final SearcherLifetimeManager searchers; |
| + public final SearcherManager mgr; |
| + public final int myNodeID; |
| + public final long[] currentNodeVersions; |
| + |
| + // TODO: nothing evicts from here!!! Somehow, on searcher |
| + // expiration on remote nodes we must evict from our |
| + // local cache...? |
| + |
| + private final Map<FieldAndShardVersion,CollectionStatistics> collectionStatsCache = new ConcurrentHashMap<FieldAndShardVersion,CollectionStatistics>(); |
| + private final Map<TermAndShardVersion,TermStatistics> termStatsCache = new ConcurrentHashMap<TermAndShardVersion,TermStatistics>(); |
| + |
| + /** Matches docs in the local shard but scores based on |
| + * aggregated stats ("mock distributed scoring") from all |
| + * nodes. */ |
| + |
| + public class ShardIndexSearcher extends IndexSearcher { |
| + // Version for the node searchers we search: |
| + public final long[] nodeVersions; |
| + public final int myNodeID; |
| + |
| + public ShardIndexSearcher(long[] nodeVersions, IndexReader localReader, int nodeID) { |
| + super(localReader); |
| + this.nodeVersions = nodeVersions; |
| + myNodeID = nodeID; |
| + assert myNodeID == NodeState.this.myNodeID: "myNodeID=" + nodeID + " NodeState.this.myNodeID=" + NodeState.this.myNodeID; |
| + } |
| + |
| + @Override |
| + public Query rewrite(Query original) throws IOException { |
| + final Query rewritten = super.rewrite(original); |
| + final Set<Term> terms = new HashSet<Term>(); |
| + rewritten.extractTerms(terms); |
| + |
| + // Make a single request to remote nodes for term |
| + // stats: |
| + for(int nodeID=0;nodeID<nodeVersions.length;nodeID++) { |
| + if (nodeID == myNodeID) { |
| + continue; |
| + } |
| + |
| + final Set<Term> missing = new HashSet<Term>(); |
| + for(Term term : terms) { |
| + final TermAndShardVersion key = new TermAndShardVersion(nodeID, nodeVersions[nodeID], term); |
| + if (!termStatsCache.containsKey(key)) { |
| + missing.add(term); |
| + } |
| + } |
| + if (missing.size() != 0) { |
| + for(Map.Entry<Term,TermStatistics> ent : getNodeTermStats(missing, nodeID, nodeVersions[nodeID]).entrySet()) { |
| + final TermAndShardVersion key = new TermAndShardVersion(nodeID, nodeVersions[nodeID], ent.getKey()); |
| + termStatsCache.put(key, ent.getValue()); |
| + } |
| + } |
| + } |
| + |
| + return rewritten; |
| + } |
| + |
| + @Override |
| + public TermStatistics termStatistics(Term term, TermContext context) throws IOException { |
| + assert term != null; |
| + int docFreq = 0; |
| + long totalTermFreq = 0; |
| + for(int nodeID=0;nodeID<nodeVersions.length;nodeID++) { |
| + |
| + final TermStatistics subStats; |
| + if (nodeID == myNodeID) { |
| + subStats = super.termStatistics(term, context); |
| + } else { |
| + final TermAndShardVersion key = new TermAndShardVersion(nodeID, nodeVersions[nodeID], term); |
| + subStats = termStatsCache.get(key); |
| + // We pre-cached during rewrite so all terms |
| + // better be here... |
| + assert subStats != null; |
| + } |
| + |
| + docFreq += subStats.docFreq(); |
| + totalTermFreq += subStats.totalTermFreq(); |
| + } |
| + |
| + return new TermStatistics(term.bytes(), docFreq, totalTermFreq); |
| + } |
| + |
| + @Override |
| + public CollectionStatistics collectionStatistics(String field) throws IOException { |
| + // TODO: we could compute this on init and cache, |
| + // since we are re-inited whenever any nodes have a |
| + // new reader |
| + int docCount = 0; |
| + long sumTotalTermFreq = 0; |
| + long sumDocFreq = 0; |
| + int maxDoc = 0; |
| + |
| + for(int nodeID=0;nodeID<nodeVersions.length;nodeID++) { |
| + final FieldAndShardVersion key = new FieldAndShardVersion(nodeID, nodeVersions[nodeID], field); |
| + final CollectionStatistics nodeStats; |
| + if (nodeID == myNodeID) { |
| + nodeStats = super.collectionStatistics(field); |
| + } else { |
| + nodeStats = collectionStatsCache.get(key); |
| + } |
| + if (nodeStats == null) { |
| + System.out.println("coll stats myNodeID=" + myNodeID + ": " + collectionStatsCache.keySet()); |
| + } |
| + // Collection stats are pre-shared on reopen, so, |
| + // we better not have a cache miss: |
| + assert nodeStats != null: "myNodeID=" + myNodeID + " nodeID=" + nodeID + " version=" + nodeVersions[nodeID] + " field=" + field; |
| + docCount += nodeStats.docCount(); |
| + sumTotalTermFreq += nodeStats.sumTotalTermFreq(); |
| + sumDocFreq += nodeStats.sumDocFreq(); |
| + maxDoc += nodeStats.maxDoc(); |
| + } |
| + |
| + return new CollectionStatistics(field, maxDoc, docCount, sumTotalTermFreq, sumDocFreq); |
| + } |
| + |
| + @Override |
| + public TopDocs search(Query query, int numHits) throws IOException { |
| + final TopDocs[] shardHits = new TopDocs[nodeVersions.length]; |
| + for(int nodeID=0;nodeID<nodeVersions.length;nodeID++) { |
| + if (nodeID == myNodeID) { |
| + // My node; run using local shard searcher we |
| + // already aquired: |
| + shardHits[nodeID] = localSearch(query, numHits); |
| + } else { |
| + shardHits[nodeID] = searchNode(nodeID, nodeVersions, query, null, numHits, null); |
| + } |
| + } |
| + |
| + // Merge: |
| + return TopDocs.merge(null, numHits, shardHits); |
| + } |
| + |
| + public TopDocs localSearch(Query query, int numHits) throws IOException { |
| + return super.search(query, numHits); |
| + } |
| + |
| + @Override |
| + public TopDocs searchAfter(ScoreDoc after, Query query, int numHits) throws IOException { |
| + final TopDocs[] shardHits = new TopDocs[nodeVersions.length]; |
| + ScoreDoc shardAfter = new ScoreDoc(after.doc, after.score); |
| + for(int nodeID=0;nodeID<nodeVersions.length;nodeID++) { |
| + if (nodeID < after.shardIndex) { |
| + // If score is tied then no docs in this shard |
| + // should be collected: |
| + shardAfter.doc = Integer.MAX_VALUE; |
| + } else if (nodeID == after.shardIndex) { |
| + // If score is tied then we break according to |
| + // docID (like normal): |
| + shardAfter.doc = after.doc; |
| + } else { |
| + // If score is tied then all docs in this shard |
| + // should be collected, because they come after |
| + // the previous bottom: |
| + shardAfter.doc = -1; |
| + } |
| + if (nodeID == myNodeID) { |
| + // My node; run using local shard searcher we |
| + // already aquired: |
| + shardHits[nodeID] = localSearchAfter(shardAfter, query, numHits); |
| + } else { |
| + shardHits[nodeID] = searchNode(nodeID, nodeVersions, query, null, numHits, shardAfter); |
| + } |
| + //System.out.println(" node=" + nodeID + " totHits=" + shardHits[nodeID].totalHits); |
| + } |
| + |
| + // Merge: |
| + return TopDocs.merge(null, numHits, shardHits); |
| + } |
| + |
| + public TopDocs localSearchAfter(ScoreDoc after, Query query, int numHits) throws IOException { |
| + return super.searchAfter(after, query, numHits); |
| + } |
| + |
| + @Override |
| + public TopFieldDocs search(Query query, int numHits, Sort sort) throws IOException { |
| + assert sort != null; |
| + final TopDocs[] shardHits = new TopDocs[nodeVersions.length]; |
| + for(int nodeID=0;nodeID<nodeVersions.length;nodeID++) { |
| + if (nodeID == myNodeID) { |
| + // My node; run using local shard searcher we |
| + // already aquired: |
| + shardHits[nodeID] = localSearch(query, numHits, sort); |
| + } else { |
| + shardHits[nodeID] = searchNode(nodeID, nodeVersions, query, sort, numHits, null); |
| + } |
| + } |
| + |
| + // Merge: |
| + return (TopFieldDocs) TopDocs.merge(sort, numHits, shardHits); |
| + } |
| + |
| + public TopFieldDocs localSearch(Query query, int numHits, Sort sort) throws IOException { |
| + return super.search(query, numHits, sort); |
| + } |
| + |
| + } |
| + |
| + private volatile ShardIndexSearcher currentShardSearcher; |
| + |
| + public NodeState(Random random, String baseDir, int nodeID, int numNodes) throws IOException { |
| + myNodeID = nodeID; |
| + dir = newFSDirectory(new File(baseDir + "." + myNodeID)); |
| + // TODO: set warmer |
| + writer = new IndexWriter(dir, new IndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random))); |
| + mgr = new SearcherManager(writer, true, null); |
| + searchers = new SearcherLifetimeManager(); |
| + |
| + // Init w/ 0s... caller above will do initial |
| + // "broadcast" by calling initSearcher: |
| + currentNodeVersions = new long[numNodes]; |
| + } |
| + |
| + public void initSearcher(long[] nodeVersions) { |
| + assert currentShardSearcher == null; |
| + System.arraycopy(nodeVersions, 0, currentNodeVersions, 0, currentNodeVersions.length); |
| + currentShardSearcher = new ShardIndexSearcher(currentNodeVersions.clone(), |
| + mgr.acquire().getIndexReader(), |
| + myNodeID); |
| + } |
| + |
| + public void updateNodeVersion(int nodeID, long version) throws IOException { |
| + currentNodeVersions[nodeID] = version; |
| + if (currentShardSearcher != null) { |
| + currentShardSearcher.getIndexReader().decRef(); |
| + } |
| + currentShardSearcher = new ShardIndexSearcher(currentNodeVersions.clone(), |
| + mgr.acquire().getIndexReader(), |
| + myNodeID); |
| + } |
| + |
| + // Get the current (fresh) searcher for this node |
| + public ShardIndexSearcher acquire() { |
| + final ShardIndexSearcher s = currentShardSearcher; |
| + // TODO: this isn't thread safe.... in theory the |
| + // reader could get decRef'd to 0 before we have a |
| + // chance to incRef, ie if a reopen happens right |
| + // after the above line, this thread gets stalled, and |
| + // the old IR is closed. But because we use SLM in |
| + // this test, this will be exceptionally rare: |
| + s.getIndexReader().incRef(); |
| + return s; |
| + } |
| + |
| + public void release(ShardIndexSearcher s) throws IOException { |
| + s.getIndexReader().decRef(); |
| + } |
| + |
| + // Get and old searcher matching the specified versions: |
| + public ShardIndexSearcher acquire(long[] nodeVersions) { |
| + final IndexSearcher s = searchers.acquire(nodeVersions[myNodeID]); |
| + if (s == null) { |
| + throw new SearcherExpiredException("nodeID=" + myNodeID + " version=" + nodeVersions[myNodeID]); |
| + } |
| + return new ShardIndexSearcher(nodeVersions, s.getIndexReader(), myNodeID); |
| + } |
| + |
| + // Reopen local reader |
| + public void reopen() throws IOException { |
| + final IndexSearcher before = mgr.acquire(); |
| + mgr.release(before); |
| + |
| + mgr.maybeReopen(); |
| + final IndexSearcher after = mgr.acquire(); |
| + try { |
| + if (after != before) { |
| + // New searcher was opened |
| + final long version = searchers.record(after); |
| + searchers.prune(new SearcherLifetimeManager.PruneByAge(maxSearcherAgeSeconds)); |
| + broadcastNodeReopen(myNodeID, version, after); |
| + } |
| + } finally { |
| + mgr.release(after); |
| + } |
| + } |
| + |
| + @Override |
| + public void close() throws IOException { |
| + if (currentShardSearcher != null) { |
| + currentShardSearcher.getIndexReader().decRef(); |
| + } |
| + searchers.close(); |
| + mgr.close(); |
| + writer.close(); |
| + dir.close(); |
| + } |
| + } |
| + |
| + // TODO: make this more realistic, ie, each node should |
| + // have its own thread, so we have true node to node |
| + // concurrency |
| + private final class ChangeIndices extends Thread { |
| + @Override |
| + public void run() { |
| + try { |
| + final LineFileDocs docs = new LineFileDocs(random); |
| + int numDocs = 0; |
| + while (System.nanoTime() < endTimeNanos) { |
| + final int what = random.nextInt(3); |
| + final NodeState node = nodes[random.nextInt(nodes.length)]; |
| + if (numDocs == 0 || what == 0) { |
| + node.writer.addDocument(docs.nextDoc()); |
| + numDocs++; |
| + } else if (what == 1) { |
| + node.writer.updateDocument(new Term("docid", ""+random.nextInt(numDocs)), |
| + docs.nextDoc()); |
| + numDocs++; |
| + } else { |
| + node.writer.deleteDocuments(new Term("docid", ""+random.nextInt(numDocs))); |
| + } |
| + // TODO: doc blocks too |
| + |
| + if (random.nextInt(17) == 12) { |
| + node.writer.commit(); |
| + } |
| + |
| + if (random.nextInt(17) == 12) { |
| + nodes[random.nextInt(nodes.length)].reopen(); |
| + } |
| + } |
| + } catch (Throwable t) { |
| + System.out.println("FAILED:"); |
| + t.printStackTrace(System.out); |
| + throw new RuntimeException(t); |
| + } |
| + } |
| + } |
| + |
| + protected NodeState[] nodes; |
| + int maxSearcherAgeSeconds; |
| + long endTimeNanos; |
| + private Thread changeIndicesThread; |
| + |
| + protected void start(String baseDirName, int numNodes, double runTimeSec, int maxSearcherAgeSeconds) throws IOException { |
| + |
| + endTimeNanos = System.nanoTime() + (long) (runTimeSec*1000000000); |
| + this.maxSearcherAgeSeconds = maxSearcherAgeSeconds; |
| + |
| + nodes = new NodeState[numNodes]; |
| + for(int nodeID=0;nodeID<numNodes;nodeID++) { |
| + nodes[nodeID] = new NodeState(random, baseDirName, nodeID, numNodes); |
| + } |
| + |
| + long[] nodeVersions = new long[nodes.length]; |
| + for(int nodeID=0;nodeID<numNodes;nodeID++) { |
| + final IndexSearcher s = nodes[nodeID].mgr.acquire(); |
| + try { |
| + nodeVersions[nodeID] = nodes[nodeID].searchers.record(s); |
| + } finally { |
| + nodes[nodeID].mgr.release(s); |
| + } |
| + } |
| + |
| + for(int nodeID=0;nodeID<numNodes;nodeID++) { |
| + final IndexSearcher s = nodes[nodeID].mgr.acquire(); |
| + assert nodeVersions[nodeID] == nodes[nodeID].searchers.record(s); |
| + assert s != null; |
| + try { |
| + broadcastNodeReopen(nodeID, nodeVersions[nodeID], s); |
| + } finally { |
| + nodes[nodeID].mgr.release(s); |
| + } |
| + } |
| + |
| + changeIndicesThread = new ChangeIndices(); |
| + changeIndicesThread.start(); |
| + } |
| + |
| + protected void finish() throws InterruptedException, IOException { |
| + changeIndicesThread.join(); |
| + for(NodeState node : nodes) { |
| + node.close(); |
| + } |
| + } |
| + |
| + protected static class SearcherAndVersion { |
| + public final IndexSearcher searcher; |
| + public final long version; |
| + |
| + public SearcherAndVersion(IndexSearcher searcher, long version) { |
| + this.searcher = searcher; |
| + this.version = version; |
| + } |
| + } |
| +} |
| |
| Property changes on: lucene/src/test-framework/java/org/apache/lucene/search/ShardSearchingTestBase.java |
| ___________________________________________________________________ |
| Added: svn:eol-style |
| ## -0,0 +1 ## |
| +native |