blob: 3dae445eb66a112fffe2f3c643b19515c47d4a90 [file] [log] [blame]
Index: lucene/CHANGES.txt
===================================================================
--- lucene/CHANGES.txt (revision 1214418)
+++ lucene/CHANGES.txt (working copy)
@@ -751,6 +751,12 @@
where they would create invalid offsets in some situations, leading to problems
in highlighting. (Max Beutel via Robert Muir)
+* LUCENE-3639: TopDocs.merge was incorrectly setting TopDocs.maxScore to
+ Float.MIN_VALUE when it should be Float.NaN, when there were 0
+ hits. Improved age calculation in SearcherLifetimeManager, to have
+ double precision and to compute age to be how long ago the searcher
+ was replaced with a new searcher (Mike McCandless)
+
Documentation
* LUCENE-3597: Fixed incorrect grouping documentation. (Martijn van Groningen, Robert Muir)
Index: lucene/src/test/org/apache/lucene/search/TestShardSearching.java
===================================================================
--- lucene/src/test/org/apache/lucene/search/TestShardSearching.java (revision 0)
+++ lucene/src/test/org/apache/lucene/search/TestShardSearching.java (working copy)
@@ -0,0 +1,393 @@
+package org.apache.lucene.search;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.MultiFields;
+import org.apache.lucene.index.MultiReader;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.index.TermsEnum;
+import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.LuceneTestCase.UseNoMemoryExpensiveCodec;
+import org.apache.lucene.util._TestUtil;
+
+// TODO
+// - other queries besides PrefixQuery & TermQuery (but:
+// FuzzyQ will be problematic... the top N terms it
+// takes means results will differ)
+// - NRQ/F
+// - BQ, negated clauses, negated prefix clauses
+// - test pulling docs in 2nd round trip...
+// - filter too
+
+@UseNoMemoryExpensiveCodec
+public class TestShardSearching extends ShardSearchingTestBase {
+
+ private static class PreviousSearchState {
+ public final long searchTimeNanos;
+ public final long[] versions;
+ public final ScoreDoc searchAfterLocal;
+ public final ScoreDoc searchAfterShard;
+ public final Sort sort;
+ public final Query query;
+ public final int numHitsPaged;
+
+ public PreviousSearchState(Query query, Sort sort, ScoreDoc searchAfterLocal, ScoreDoc searchAfterShard, long[] versions, int numHitsPaged) {
+ this.versions = versions.clone();
+ this.searchAfterLocal = searchAfterLocal;
+ this.searchAfterShard = searchAfterShard;
+ this.sort = sort;
+ this.query = query;
+ this.numHitsPaged = numHitsPaged;
+ searchTimeNanos = System.nanoTime();
+ }
+ }
+
+ public void testSimple() throws Exception {
+ final int numNodes = _TestUtil.nextInt(random, 1, 10);
+
+ final double runTimeSec = 5.0 * RANDOM_MULTIPLIER;
+
+ final int minDocsToMakeTerms = _TestUtil.nextInt(random, 5, 20);
+
+ final int maxSearcherAgeSeconds = _TestUtil.nextInt(random, 1, 4);
+
+ if (VERBOSE) {
+ System.out.println("TEST: numNodes=" + numNodes + " runTimeSec=" + runTimeSec + " maxSearcherAgeSeconds=" + maxSearcherAgeSeconds);
+ }
+
+ start(_TestUtil.getTempDir("TestShardSearching").toString(),
+ numNodes,
+ runTimeSec,
+ maxSearcherAgeSeconds
+ );
+
+ final List<PreviousSearchState> priorSearches = new ArrayList<PreviousSearchState>();
+ List<BytesRef> terms = null;
+ while (System.nanoTime() < endTimeNanos) {
+
+ final boolean doFollowon = priorSearches.size() > 0 && random.nextInt(7) == 1;
+
+ // Pick a random node; we will run the query on this node:
+ final int myNodeID = random.nextInt(numNodes);
+
+ final NodeState.ShardIndexSearcher localShardSearcher;
+
+ final PreviousSearchState prevSearchState;
+
+ if (doFollowon) {
+ // Pretend user issued a followon query:
+ prevSearchState = priorSearches.get(random.nextInt(priorSearches.size()));
+
+ if (VERBOSE) {
+ System.out.println("\nTEST: follow-on query age=" + ((System.nanoTime() - prevSearchState.searchTimeNanos)/1000000000.0));
+ }
+
+ try {
+ localShardSearcher = nodes[myNodeID].acquire(prevSearchState.versions);
+ } catch (SearcherExpiredException see) {
+ // Expected, sometimes; in a "real" app we would
+ // either forward this error to the user ("too
+ // much time has passed; please re-run your
+ // search") or sneakily just switch to newest
+ // searcher w/o telling them...
+ if (VERBOSE) {
+ System.out.println(" searcher expired during local shard searcher init: " + see);
+ }
+ priorSearches.remove(prevSearchState);
+ continue;
+ }
+ } else {
+ if (VERBOSE) {
+ System.out.println("\nTEST: fresh query");
+ }
+ // Do fresh query:
+ localShardSearcher = nodes[myNodeID].acquire();
+ prevSearchState = null;
+ }
+
+ final IndexReader[] subs = new IndexReader[numNodes];
+
+ PreviousSearchState searchState = null;
+
+ try {
+
+ // Mock: now make a single reader (MultiReader) from all node
+ // searchers. In a real shard env you can't do this... we
+ // do it to confirm results from the shard searcher
+ // are correct:
+ int docCount = 0;
+ try {
+ for(int nodeID=0;nodeID<numNodes;nodeID++) {
+ final long subVersion = localShardSearcher.nodeVersions[nodeID];
+ final IndexSearcher sub = nodes[nodeID].searchers.acquire(subVersion);
+ if (sub == null) {
+ nodeID--;
+ while(nodeID >= 0) {
+ subs[nodeID].decRef();
+ subs[nodeID] = null;
+ nodeID--;
+ }
+ throw new SearcherExpiredException("nodeID=" + nodeID + " version=" + subVersion);
+ }
+ subs[nodeID] = sub.getIndexReader();
+ docCount += subs[nodeID].maxDoc();
+ }
+ } catch (SearcherExpiredException see) {
+ // Expected
+ if (VERBOSE) {
+ System.out.println(" searcher expired during mock reader init: " + see);
+ }
+ continue;
+ }
+
+ final IndexReader mockReader = new MultiReader(subs);
+ final IndexSearcher mockSearcher = new IndexSearcher(mockReader);
+
+ Query query;
+ Sort sort;
+
+ if (prevSearchState != null) {
+ query = prevSearchState.query;
+ sort = prevSearchState.sort;
+ } else {
+ if (terms == null && docCount > minDocsToMakeTerms) {
+ // TODO: try to "focus" on high freq terms sometimes too
+ // TODO: maybe also periodically reset the terms...?
+ final TermsEnum termsEnum = MultiFields.getTerms(mockReader, "body").iterator(null);
+ terms = new ArrayList<BytesRef>();
+ while(termsEnum.next() != null) {
+ terms.add(BytesRef.deepCopyOf(termsEnum.term()));
+ }
+ if (VERBOSE) {
+ System.out.println("TEST: init terms: " + terms.size() + " terms");
+ }
+ if (terms.size() == 0) {
+ terms = null;
+ }
+ }
+
+ if (VERBOSE) {
+ System.out.println(" maxDoc=" + mockReader.maxDoc());
+ }
+
+ if (terms != null) {
+ if (random.nextBoolean()) {
+ query = new TermQuery(new Term("body", terms.get(random.nextInt(terms.size()))));
+ } else {
+ final String t = terms.get(random.nextInt(terms.size())).utf8ToString();
+ final String prefix;
+ if (t.length() <= 1) {
+ prefix = t;
+ } else {
+ prefix = t.substring(0, _TestUtil.nextInt(random, 1, 2));
+ }
+ query = new PrefixQuery(new Term("body", prefix));
+ }
+
+ if (random.nextBoolean()) {
+ sort = null;
+ } else {
+ // TODO: sort by more than 1 field
+ final int what = random.nextInt(3);
+ if (what == 0) {
+ sort = new Sort(SortField.FIELD_SCORE);
+ } else if (what == 1) {
+ // TODO: this sort doesn't merge
+ // correctly... it's tricky because you
+ // could have > 2.1B docs across all shards:
+ //sort = new Sort(SortField.FIELD_DOC);
+ sort = null;
+ } else if (what == 2) {
+ sort = new Sort(new SortField[] {new SortField("docid", SortField.Type.INT, random.nextBoolean())});
+ } else {
+ sort = new Sort(new SortField[] {new SortField("title", SortField.Type.STRING, random.nextBoolean())});
+ }
+ }
+ } else {
+ query = null;
+ sort = null;
+ }
+ }
+
+ if (query != null) {
+
+ try {
+ searchState = assertSame(mockSearcher, localShardSearcher, query, sort, prevSearchState);
+ } catch (SearcherExpiredException see) {
+ // Expected; in a "real" app we would
+ // either forward this error to the user ("too
+ // much time has passed; please re-run your
+ // search") or sneakily just switch to newest
+ // searcher w/o telling them...
+ if (VERBOSE) {
+ System.out.println(" searcher expired during search: " + see);
+ see.printStackTrace(System.out);
+ }
+ assert prevSearchState != null;
+ priorSearches.remove(prevSearchState);
+ }
+ }
+ } finally {
+ nodes[myNodeID].release(localShardSearcher);
+ for(IndexReader sub : subs) {
+ if (sub != null) {
+ sub.decRef();
+ }
+ }
+ }
+
+ if (searchState != null && searchState.searchAfterLocal != null && random.nextInt(5) == 3) {
+ priorSearches.add(searchState);
+ if (priorSearches.size() > 200) {
+ Collections.shuffle(priorSearches, random);
+ priorSearches.subList(100, priorSearches.size()).clear();
+ }
+ }
+ }
+
+ finish();
+ }
+
+ private PreviousSearchState assertSame(IndexSearcher mockSearcher, NodeState.ShardIndexSearcher shardSearcher, Query q, Sort sort, PreviousSearchState state) throws IOException {
+
+ int numHits = _TestUtil.nextInt(random, 1, 100);
+ if (state != null && state.searchAfterLocal == null) {
+ // In addition to what we last searched:
+ numHits += state.numHitsPaged;
+ }
+
+ if (VERBOSE) {
+ System.out.println("TEST: query=" + q + " sort=" + sort + " numHits=" + numHits);
+ if (state != null) {
+ System.out.println(" prev: searchAfterLocal=" + state.searchAfterLocal + " searchAfterShard=" + state.searchAfterShard + " numHitsPaged=" + state.numHitsPaged);
+ }
+ }
+
+ // Single (mock local) searcher:
+ final TopDocs hits;
+ if (sort == null) {
+ if (state != null && state.searchAfterLocal != null) {
+ hits = mockSearcher.searchAfter(state.searchAfterLocal, q, numHits);
+ } else {
+ hits = mockSearcher.search(q, numHits);
+ }
+ } else {
+ hits = mockSearcher.search(q, numHits, sort);
+ }
+
+ // Shard searcher
+ final TopDocs shardHits;
+ if (sort == null) {
+ if (state != null && state.searchAfterShard != null) {
+ shardHits = shardSearcher.searchAfter(state.searchAfterShard, q, numHits);
+ } else {
+ shardHits = shardSearcher.search(q, numHits);
+ }
+ } else {
+ shardHits = shardSearcher.search(q, numHits, sort);
+ }
+
+ final int numNodes = shardSearcher.nodeVersions.length;
+ int[] base = new int[numNodes];
+ final IndexReader[] subs = mockSearcher.getIndexReader().getSequentialSubReaders();
+ assertEquals(numNodes, subs.length);
+
+ int docCount = 0;
+ for(int nodeID=0;nodeID<numNodes;nodeID++) {
+ base[nodeID] = docCount;
+ docCount += subs[nodeID].maxDoc();
+ }
+
+ if (VERBOSE) {
+ /*
+ for(int shardID=0;shardID<shardSearchers.length;shardID++) {
+ System.out.println(" shard=" + shardID + " maxDoc=" + shardSearchers[shardID].searcher.getIndexReader().maxDoc());
+ }
+ */
+ System.out.println(" single searcher: " + hits.totalHits + " totalHits maxScore=" + hits.getMaxScore());
+ for(int i=0;i<hits.scoreDocs.length;i++) {
+ final ScoreDoc sd = hits.scoreDocs[i];
+ System.out.println(" doc=" + sd.doc + " score=" + sd.score);
+ }
+ System.out.println(" shard searcher: " + shardHits.totalHits + " totalHits maxScore=" + shardHits.getMaxScore());
+ for(int i=0;i<shardHits.scoreDocs.length;i++) {
+ final ScoreDoc sd = shardHits.scoreDocs[i];
+ System.out.println(" doc=" + sd.doc + " (rebased: " + (sd.doc + base[sd.shardIndex]) + ") score=" + sd.score + " shard=" + sd.shardIndex);
+ }
+ }
+
+ int numHitsPaged;
+ if (state != null && state.searchAfterLocal != null) {
+ numHitsPaged = hits.scoreDocs.length;
+ if (state != null) {
+ numHitsPaged += state.numHitsPaged;
+ }
+ } else {
+ numHitsPaged = hits.scoreDocs.length;
+ }
+
+ final boolean moreHits;
+
+ final ScoreDoc bottomHit;
+ final ScoreDoc bottomHitShards;
+
+ if (numHitsPaged < hits.totalHits) {
+ // More hits to page through
+ moreHits = true;
+ if (sort == null) {
+ bottomHit = hits.scoreDocs[hits.scoreDocs.length-1];
+ final ScoreDoc sd = shardHits.scoreDocs[shardHits.scoreDocs.length-1];
+ // Must copy because below we rebase:
+ bottomHitShards = new ScoreDoc(sd.doc, sd.score, sd.shardIndex);
+ if (VERBOSE) {
+ System.out.println(" save bottomHit=" + bottomHit);
+ }
+ } else {
+ bottomHit = null;
+ bottomHitShards = null;
+ }
+
+ } else {
+ assertEquals(hits.totalHits, numHitsPaged);
+ bottomHit = null;
+ bottomHitShards = null;
+ moreHits = false;
+ }
+
+ // Must rebase so assertEquals passes:
+ for(int hitID=0;hitID<shardHits.scoreDocs.length;hitID++) {
+ final ScoreDoc sd = shardHits.scoreDocs[hitID];
+ sd.doc += base[sd.shardIndex];
+ }
+
+ _TestUtil.assertEquals(hits, shardHits);
+
+ if (moreHits) {
+ // Return a continuation:
+ return new PreviousSearchState(q, sort, bottomHit, bottomHitShards, shardSearcher.nodeVersions, numHitsPaged);
+ } else {
+ return null;
+ }
+ }
+}
Property changes on: lucene/src/test/org/apache/lucene/search/TestShardSearching.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
Index: lucene/src/java/org/apache/lucene/search/SearcherLifetimeManager.java
===================================================================
--- lucene/src/java/org/apache/lucene/search/SearcherLifetimeManager.java (revision 1214418)
+++ lucene/src/java/org/apache/lucene/search/SearcherLifetimeManager.java (working copy)
@@ -23,7 +23,6 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
import org.apache.lucene.search.NRTManager; // javadocs
import org.apache.lucene.index.IndexReader; // javadocs
@@ -101,9 +100,11 @@
public class SearcherLifetimeManager implements Closeable {
+ static final double NANOS_PER_SEC = 1000000000.0;
+
private static class SearcherTracker implements Comparable<SearcherTracker>, Closeable {
public final IndexSearcher searcher;
- public final long recordTimeSec;
+ public final double recordTimeSec;
public final long version;
public SearcherTracker(IndexSearcher searcher) {
@@ -112,7 +113,7 @@
searcher.getIndexReader().incRef();
// Use nanoTime not currentTimeMillis since it [in
// theory] reduces risk from clock shift
- recordTimeSec = TimeUnit.NANOSECONDS.toSeconds(System.nanoTime());
+ recordTimeSec = System.nanoTime() / NANOS_PER_SEC;
}
// Newer searchers are sort before older ones:
@@ -170,6 +171,7 @@
final long version = searcher.getIndexReader().getVersion();
SearcherTracker tracker = searchers.get(version);
if (tracker == null) {
+ //System.out.println("RECORD version=" + version + " ms=" + System.currentTimeMillis());
tracker = new SearcherTracker(searcher);
if (searchers.putIfAbsent(version, tracker) != null) {
// Another thread beat us -- must decRef to undo
@@ -217,29 +219,28 @@
/** See {@link #prune}. */
public interface Pruner {
/** Return true if this searcher should be removed.
- * @param ageSec how long ago this searcher was
- * recorded vs the most recently recorded
- * searcher
+ * @param ageSec how much time has passed since this
+ * searcher was the current (live) searcher
* @param searcher Searcher
**/
- public boolean doPrune(int ageSec, IndexSearcher searcher);
+ public boolean doPrune(double ageSec, IndexSearcher searcher);
}
/** Simple pruner that drops any searcher older by
* more than the specified seconds, than the newest
* searcher. */
public final static class PruneByAge implements Pruner {
- private final int maxAgeSec;
+ private final double maxAgeSec;
- public PruneByAge(int maxAgeSec) {
- if (maxAgeSec < 1) {
+ public PruneByAge(double maxAgeSec) {
+ if (maxAgeSec < 0) {
throw new IllegalArgumentException("maxAgeSec must be > 0 (got " + maxAgeSec + ")");
}
this.maxAgeSec = maxAgeSec;
}
@Override
- public boolean doPrune(int ageSec, IndexSearcher searcher) {
+ public boolean doPrune(double ageSec, IndexSearcher searcher) {
return ageSec > maxAgeSec;
}
}
@@ -261,14 +262,25 @@
trackers.add(tracker);
}
Collections.sort(trackers);
- final long newestSec = trackers.isEmpty() ? 0L : trackers.get(0).recordTimeSec;
+ double lastRecordTimeSec = 0.0;
+ final double now = System.nanoTime()/NANOS_PER_SEC;
for (SearcherTracker tracker: trackers) {
- final int ageSec = (int) (newestSec - tracker.recordTimeSec);
- assert ageSec >= 0;
+ final double ageSec;
+ if (lastRecordTimeSec == 0.0) {
+ ageSec = 0.0;
+ } else {
+ ageSec = now - lastRecordTimeSec;
+ }
+ // First tracker is always age 0.0 sec, since it's
+ // still "live"; second tracker's age (= seconds since
+ // it was "live") is now minus first tracker's
+ // recordTime, etc:
if (pruner.doPrune(ageSec, tracker.searcher)) {
+ //System.out.println("PRUNE version=" + tracker.version + " age=" + ageSec + " ms=" + System.currentTimeMillis());
searchers.remove(tracker.version);
tracker.close();
}
+ lastRecordTimeSec = tracker.recordTimeSec;
}
}
Index: lucene/src/java/org/apache/lucene/search/TopDocs.java
===================================================================
--- lucene/src/java/org/apache/lucene/search/TopDocs.java (revision 1214418)
+++ lucene/src/java/org/apache/lucene/search/TopDocs.java (working copy)
@@ -216,8 +216,10 @@
float maxScore = Float.MIN_VALUE;
for(int shardIDX=0;shardIDX<shardHits.length;shardIDX++) {
final TopDocs shard = shardHits[shardIDX];
+ // totalHits can be non-zero even if no hits were
+ // collected, when searchAfter was used:
+ totalHitCount += shard.totalHits;
if (shard.scoreDocs != null && shard.scoreDocs.length > 0) {
- totalHitCount += shard.totalHits;
availHitCount += shard.scoreDocs.length;
queue.add(new ShardRef(shardIDX));
maxScore = Math.max(maxScore, shard.getMaxScore());
@@ -225,6 +227,10 @@
}
}
+ if (availHitCount == 0) {
+ maxScore = Float.NaN;
+ }
+
final ScoreDoc[] hits = new ScoreDoc[Math.min(topN, availHitCount)];
int hitUpto = 0;
Index: lucene/src/java/org/apache/lucene/search/ScoreDoc.java
===================================================================
--- lucene/src/java/org/apache/lucene/search/ScoreDoc.java (revision 1214418)
+++ lucene/src/java/org/apache/lucene/search/ScoreDoc.java (working copy)
@@ -46,6 +46,6 @@
// A convenience method for debugging.
@Override
public String toString() {
- return "doc=" + doc + " score=" + score;
+ return "doc=" + doc + " score=" + score + " shardIndex=" + shardIndex;
}
}
Index: lucene/src/test-framework/java/org/apache/lucene/search/ShardSearchingTestBase.java
===================================================================
--- lucene/src/test-framework/java/org/apache/lucene/search/ShardSearchingTestBase.java (revision 0)
+++ lucene/src/test-framework/java/org/apache/lucene/search/ShardSearchingTestBase.java (working copy)
@@ -0,0 +1,575 @@
+package org.apache.lucene.search;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.analysis.MockAnalyzer;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.util.LineFileDocs;
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.lucene.util.TermContext;
+
+// TODO
+// - doc blocks? so we can test joins/grouping...
+// - controlled consistency (NRTMgr)
+
+public abstract class ShardSearchingTestBase extends LuceneTestCase {
+
+ // TODO: maybe SLM should throw this instead of returning null...
+ public static class SearcherExpiredException extends RuntimeException {
+ public SearcherExpiredException(String message) {
+ super(message);
+ }
+ }
+
+ private static class FieldAndShardVersion {
+ private final long version;
+ private final int nodeID;
+ private final String field;
+
+ public FieldAndShardVersion(int nodeID, long version, String field) {
+ this.nodeID = nodeID;
+ this.version = version;
+ this.field = field;
+ }
+
+ @Override
+ public int hashCode() {
+ return (int) (version * nodeID + field.hashCode());
+ }
+
+ @Override
+ public boolean equals(Object _other) {
+ if (!(_other instanceof FieldAndShardVersion)) {
+ return false;
+ }
+
+ final FieldAndShardVersion other = (FieldAndShardVersion) _other;
+
+ return field.equals(other.field) && version == other.version && nodeID == other.nodeID;
+ }
+
+ @Override
+ public String toString() {
+ return "FieldAndShardVersion(field=" + field + " nodeID=" + nodeID + " version=" + version+ ")";
+ }
+ }
+
+ private static class TermAndShardVersion {
+ private final long version;
+ private final int nodeID;
+ private final Term term;
+
+ public TermAndShardVersion(int nodeID, long version, Term term) {
+ this.nodeID = nodeID;
+ this.version = version;
+ this.term = term;
+ }
+
+ @Override
+ public int hashCode() {
+ return (int) (version * nodeID + term.hashCode());
+ }
+
+ @Override
+ public boolean equals(Object _other) {
+ if (!(_other instanceof TermAndShardVersion)) {
+ return false;
+ }
+
+ final TermAndShardVersion other = (TermAndShardVersion) _other;
+
+ return term.equals(other.term) && version == other.version && nodeID == other.nodeID;
+ }
+ }
+
+ // We share collection stats for these fields on each node
+ // reopen:
+ private final String[] fieldsToShare = new String[] {"body", "title"};
+
+ // Called by one node once it has reopened, to notify all
+ // other nodes. This is just a mock (since it goes and
+ // directly updates all other nodes, in RAM)... in a real
+ // env this would hit the wire, sending version &
+ // collection stats to all other nodes:
+ void broadcastNodeReopen(int nodeID, long version, IndexSearcher newSearcher) throws IOException {
+
+ if (VERBOSE) {
+ System.out.println("REOPEN: nodeID=" + nodeID + " version=" + version + " maxDoc=" + newSearcher.getIndexReader().maxDoc());
+ }
+
+ // Broadcast new collection stats for this node to all
+ // other nodes:
+ for(String field : fieldsToShare) {
+ final CollectionStatistics stats = newSearcher.collectionStatistics(field);
+ for (NodeState node : nodes) {
+ // Don't put my own collection stats into the cache;
+ // we pull locally:
+ if (node.myNodeID != nodeID) {
+ node.collectionStatsCache.put(new FieldAndShardVersion(nodeID, version, field), stats);
+ }
+ }
+ }
+ for (NodeState node : nodes) {
+ node.updateNodeVersion(nodeID, version);
+ }
+ }
+
+ // MOCK: in a real env you have to hit the wire
+ // (send this query to all remote nodes
+ // concurrently):
+ TopDocs searchNode(int nodeID, long[] nodeVersions, Query q, Sort sort, int numHits, ScoreDoc searchAfter) throws IOException {
+ final NodeState.ShardIndexSearcher s = nodes[nodeID].acquire(nodeVersions);
+ try {
+ if (sort == null) {
+ if (searchAfter != null) {
+ return s.localSearchAfter(searchAfter, q, numHits);
+ } else {
+ return s.localSearch(q, numHits);
+ }
+ } else {
+ assert searchAfter == null; // not supported yet
+ return s.localSearch(q, numHits, sort);
+ }
+ } finally {
+ nodes[nodeID].release(s);
+ }
+ }
+
+ // Mock: in a real env, this would hit the wire and get
+ // term stats from remote node
+ Map<Term,TermStatistics> getNodeTermStats(Set<Term> terms, int nodeID, long version) throws IOException {
+ final NodeState node = nodes[nodeID];
+ final Map<Term,TermStatistics> stats = new HashMap<Term,TermStatistics>();
+ final IndexSearcher s = node.searchers.acquire(version);
+ if (s == null) {
+ throw new SearcherExpiredException("node=" + nodeID + " version=" + version);
+ }
+ try {
+ for(Term term : terms) {
+ final TermContext termContext = TermContext.build(s.getIndexReader().getTopReaderContext(), term, false);
+ stats.put(term, s.termStatistics(term, termContext));
+ }
+ } finally {
+ node.searchers.release(s);
+ }
+ return stats;
+ }
+
+ protected final class NodeState implements Closeable {
+ public final Directory dir;
+ public final IndexWriter writer;
+ public final SearcherLifetimeManager searchers;
+ public final SearcherManager mgr;
+ public final int myNodeID;
+ public final long[] currentNodeVersions;
+
+ // TODO: nothing evicts from here!!! Somehow, on searcher
+ // expiration on remote nodes we must evict from our
+ // local cache...?
+
+ private final Map<FieldAndShardVersion,CollectionStatistics> collectionStatsCache = new ConcurrentHashMap<FieldAndShardVersion,CollectionStatistics>();
+ private final Map<TermAndShardVersion,TermStatistics> termStatsCache = new ConcurrentHashMap<TermAndShardVersion,TermStatistics>();
+
+ /** Matches docs in the local shard but scores based on
+ * aggregated stats ("mock distributed scoring") from all
+ * nodes. */
+
+ public class ShardIndexSearcher extends IndexSearcher {
+ // Version for the node searchers we search:
+ public final long[] nodeVersions;
+ public final int myNodeID;
+
+ public ShardIndexSearcher(long[] nodeVersions, IndexReader localReader, int nodeID) {
+ super(localReader);
+ this.nodeVersions = nodeVersions;
+ myNodeID = nodeID;
+ assert myNodeID == NodeState.this.myNodeID: "myNodeID=" + nodeID + " NodeState.this.myNodeID=" + NodeState.this.myNodeID;
+ }
+
+ @Override
+ public Query rewrite(Query original) throws IOException {
+ final Query rewritten = super.rewrite(original);
+ final Set<Term> terms = new HashSet<Term>();
+ rewritten.extractTerms(terms);
+
+ // Make a single request to remote nodes for term
+ // stats:
+ for(int nodeID=0;nodeID<nodeVersions.length;nodeID++) {
+ if (nodeID == myNodeID) {
+ continue;
+ }
+
+ final Set<Term> missing = new HashSet<Term>();
+ for(Term term : terms) {
+ final TermAndShardVersion key = new TermAndShardVersion(nodeID, nodeVersions[nodeID], term);
+ if (!termStatsCache.containsKey(key)) {
+ missing.add(term);
+ }
+ }
+ if (missing.size() != 0) {
+ for(Map.Entry<Term,TermStatistics> ent : getNodeTermStats(missing, nodeID, nodeVersions[nodeID]).entrySet()) {
+ final TermAndShardVersion key = new TermAndShardVersion(nodeID, nodeVersions[nodeID], ent.getKey());
+ termStatsCache.put(key, ent.getValue());
+ }
+ }
+ }
+
+ return rewritten;
+ }
+
+ @Override
+ public TermStatistics termStatistics(Term term, TermContext context) throws IOException {
+ assert term != null;
+ int docFreq = 0;
+ long totalTermFreq = 0;
+ for(int nodeID=0;nodeID<nodeVersions.length;nodeID++) {
+
+ final TermStatistics subStats;
+ if (nodeID == myNodeID) {
+ subStats = super.termStatistics(term, context);
+ } else {
+ final TermAndShardVersion key = new TermAndShardVersion(nodeID, nodeVersions[nodeID], term);
+ subStats = termStatsCache.get(key);
+ // We pre-cached during rewrite so all terms
+ // better be here...
+ assert subStats != null;
+ }
+
+ docFreq += subStats.docFreq();
+ totalTermFreq += subStats.totalTermFreq();
+ }
+
+ return new TermStatistics(term.bytes(), docFreq, totalTermFreq);
+ }
+
+ @Override
+ public CollectionStatistics collectionStatistics(String field) throws IOException {
+ // TODO: we could compute this on init and cache,
+ // since we are re-inited whenever any nodes have a
+ // new reader
+ int docCount = 0;
+ long sumTotalTermFreq = 0;
+ long sumDocFreq = 0;
+ int maxDoc = 0;
+
+ for(int nodeID=0;nodeID<nodeVersions.length;nodeID++) {
+ final FieldAndShardVersion key = new FieldAndShardVersion(nodeID, nodeVersions[nodeID], field);
+ final CollectionStatistics nodeStats;
+ if (nodeID == myNodeID) {
+ nodeStats = super.collectionStatistics(field);
+ } else {
+ nodeStats = collectionStatsCache.get(key);
+ }
+ if (nodeStats == null) {
+ System.out.println("coll stats myNodeID=" + myNodeID + ": " + collectionStatsCache.keySet());
+ }
+ // Collection stats are pre-shared on reopen, so,
+ // we better not have a cache miss:
+ assert nodeStats != null: "myNodeID=" + myNodeID + " nodeID=" + nodeID + " version=" + nodeVersions[nodeID] + " field=" + field;
+ docCount += nodeStats.docCount();
+ sumTotalTermFreq += nodeStats.sumTotalTermFreq();
+ sumDocFreq += nodeStats.sumDocFreq();
+ maxDoc += nodeStats.maxDoc();
+ }
+
+ return new CollectionStatistics(field, maxDoc, docCount, sumTotalTermFreq, sumDocFreq);
+ }
+
+ @Override
+ public TopDocs search(Query query, int numHits) throws IOException {
+ final TopDocs[] shardHits = new TopDocs[nodeVersions.length];
+ for(int nodeID=0;nodeID<nodeVersions.length;nodeID++) {
+ if (nodeID == myNodeID) {
+ // My node; run using local shard searcher we
+ // already aquired:
+ shardHits[nodeID] = localSearch(query, numHits);
+ } else {
+ shardHits[nodeID] = searchNode(nodeID, nodeVersions, query, null, numHits, null);
+ }
+ }
+
+ // Merge:
+ return TopDocs.merge(null, numHits, shardHits);
+ }
+
+ public TopDocs localSearch(Query query, int numHits) throws IOException {
+ return super.search(query, numHits);
+ }
+
+ @Override
+ public TopDocs searchAfter(ScoreDoc after, Query query, int numHits) throws IOException {
+ final TopDocs[] shardHits = new TopDocs[nodeVersions.length];
+ ScoreDoc shardAfter = new ScoreDoc(after.doc, after.score);
+ for(int nodeID=0;nodeID<nodeVersions.length;nodeID++) {
+ if (nodeID < after.shardIndex) {
+ // If score is tied then no docs in this shard
+ // should be collected:
+ shardAfter.doc = Integer.MAX_VALUE;
+ } else if (nodeID == after.shardIndex) {
+ // If score is tied then we break according to
+ // docID (like normal):
+ shardAfter.doc = after.doc;
+ } else {
+ // If score is tied then all docs in this shard
+ // should be collected, because they come after
+ // the previous bottom:
+ shardAfter.doc = -1;
+ }
+ if (nodeID == myNodeID) {
+ // My node; run using local shard searcher we
+ // already aquired:
+ shardHits[nodeID] = localSearchAfter(shardAfter, query, numHits);
+ } else {
+ shardHits[nodeID] = searchNode(nodeID, nodeVersions, query, null, numHits, shardAfter);
+ }
+ //System.out.println(" node=" + nodeID + " totHits=" + shardHits[nodeID].totalHits);
+ }
+
+ // Merge:
+ return TopDocs.merge(null, numHits, shardHits);
+ }
+
+ public TopDocs localSearchAfter(ScoreDoc after, Query query, int numHits) throws IOException {
+ return super.searchAfter(after, query, numHits);
+ }
+
+ @Override
+ public TopFieldDocs search(Query query, int numHits, Sort sort) throws IOException {
+ assert sort != null;
+ final TopDocs[] shardHits = new TopDocs[nodeVersions.length];
+ for(int nodeID=0;nodeID<nodeVersions.length;nodeID++) {
+ if (nodeID == myNodeID) {
+ // My node; run using local shard searcher we
+ // already aquired:
+ shardHits[nodeID] = localSearch(query, numHits, sort);
+ } else {
+ shardHits[nodeID] = searchNode(nodeID, nodeVersions, query, sort, numHits, null);
+ }
+ }
+
+ // Merge:
+ return (TopFieldDocs) TopDocs.merge(sort, numHits, shardHits);
+ }
+
+ public TopFieldDocs localSearch(Query query, int numHits, Sort sort) throws IOException {
+ return super.search(query, numHits, sort);
+ }
+
+ }
+
+ private volatile ShardIndexSearcher currentShardSearcher;
+
+ public NodeState(Random random, String baseDir, int nodeID, int numNodes) throws IOException {
+ myNodeID = nodeID;
+ dir = newFSDirectory(new File(baseDir + "." + myNodeID));
+ // TODO: set warmer
+ writer = new IndexWriter(dir, new IndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random)));
+ mgr = new SearcherManager(writer, true, null);
+ searchers = new SearcherLifetimeManager();
+
+ // Init w/ 0s... caller above will do initial
+ // "broadcast" by calling initSearcher:
+ currentNodeVersions = new long[numNodes];
+ }
+
+ public void initSearcher(long[] nodeVersions) {
+ assert currentShardSearcher == null;
+ System.arraycopy(nodeVersions, 0, currentNodeVersions, 0, currentNodeVersions.length);
+ currentShardSearcher = new ShardIndexSearcher(currentNodeVersions.clone(),
+ mgr.acquire().getIndexReader(),
+ myNodeID);
+ }
+
+ public void updateNodeVersion(int nodeID, long version) throws IOException {
+ currentNodeVersions[nodeID] = version;
+ if (currentShardSearcher != null) {
+ currentShardSearcher.getIndexReader().decRef();
+ }
+ currentShardSearcher = new ShardIndexSearcher(currentNodeVersions.clone(),
+ mgr.acquire().getIndexReader(),
+ myNodeID);
+ }
+
+ // Get the current (fresh) searcher for this node
+ public ShardIndexSearcher acquire() {
+ final ShardIndexSearcher s = currentShardSearcher;
+ // TODO: this isn't thread safe.... in theory the
+ // reader could get decRef'd to 0 before we have a
+ // chance to incRef, ie if a reopen happens right
+ // after the above line, this thread gets stalled, and
+ // the old IR is closed. But because we use SLM in
+ // this test, this will be exceptionally rare:
+ s.getIndexReader().incRef();
+ return s;
+ }
+
+ public void release(ShardIndexSearcher s) throws IOException {
+ s.getIndexReader().decRef();
+ }
+
+ // Get and old searcher matching the specified versions:
+ public ShardIndexSearcher acquire(long[] nodeVersions) {
+ final IndexSearcher s = searchers.acquire(nodeVersions[myNodeID]);
+ if (s == null) {
+ throw new SearcherExpiredException("nodeID=" + myNodeID + " version=" + nodeVersions[myNodeID]);
+ }
+ return new ShardIndexSearcher(nodeVersions, s.getIndexReader(), myNodeID);
+ }
+
+ // Reopen local reader
+ public void reopen() throws IOException {
+ final IndexSearcher before = mgr.acquire();
+ mgr.release(before);
+
+ mgr.maybeReopen();
+ final IndexSearcher after = mgr.acquire();
+ try {
+ if (after != before) {
+ // New searcher was opened
+ final long version = searchers.record(after);
+ searchers.prune(new SearcherLifetimeManager.PruneByAge(maxSearcherAgeSeconds));
+ broadcastNodeReopen(myNodeID, version, after);
+ }
+ } finally {
+ mgr.release(after);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (currentShardSearcher != null) {
+ currentShardSearcher.getIndexReader().decRef();
+ }
+ searchers.close();
+ mgr.close();
+ writer.close();
+ dir.close();
+ }
+ }
+
+ // TODO: make this more realistic, ie, each node should
+ // have its own thread, so we have true node to node
+ // concurrency
+ private final class ChangeIndices extends Thread {
+ @Override
+ public void run() {
+ try {
+ final LineFileDocs docs = new LineFileDocs(random);
+ int numDocs = 0;
+ while (System.nanoTime() < endTimeNanos) {
+ final int what = random.nextInt(3);
+ final NodeState node = nodes[random.nextInt(nodes.length)];
+ if (numDocs == 0 || what == 0) {
+ node.writer.addDocument(docs.nextDoc());
+ numDocs++;
+ } else if (what == 1) {
+ node.writer.updateDocument(new Term("docid", ""+random.nextInt(numDocs)),
+ docs.nextDoc());
+ numDocs++;
+ } else {
+ node.writer.deleteDocuments(new Term("docid", ""+random.nextInt(numDocs)));
+ }
+ // TODO: doc blocks too
+
+ if (random.nextInt(17) == 12) {
+ node.writer.commit();
+ }
+
+ if (random.nextInt(17) == 12) {
+ nodes[random.nextInt(nodes.length)].reopen();
+ }
+ }
+ } catch (Throwable t) {
+ System.out.println("FAILED:");
+ t.printStackTrace(System.out);
+ throw new RuntimeException(t);
+ }
+ }
+ }
+
+ protected NodeState[] nodes;
+ int maxSearcherAgeSeconds;
+ long endTimeNanos;
+ private Thread changeIndicesThread;
+
+ protected void start(String baseDirName, int numNodes, double runTimeSec, int maxSearcherAgeSeconds) throws IOException {
+
+ endTimeNanos = System.nanoTime() + (long) (runTimeSec*1000000000);
+ this.maxSearcherAgeSeconds = maxSearcherAgeSeconds;
+
+ nodes = new NodeState[numNodes];
+ for(int nodeID=0;nodeID<numNodes;nodeID++) {
+ nodes[nodeID] = new NodeState(random, baseDirName, nodeID, numNodes);
+ }
+
+ long[] nodeVersions = new long[nodes.length];
+ for(int nodeID=0;nodeID<numNodes;nodeID++) {
+ final IndexSearcher s = nodes[nodeID].mgr.acquire();
+ try {
+ nodeVersions[nodeID] = nodes[nodeID].searchers.record(s);
+ } finally {
+ nodes[nodeID].mgr.release(s);
+ }
+ }
+
+ for(int nodeID=0;nodeID<numNodes;nodeID++) {
+ final IndexSearcher s = nodes[nodeID].mgr.acquire();
+ assert nodeVersions[nodeID] == nodes[nodeID].searchers.record(s);
+ assert s != null;
+ try {
+ broadcastNodeReopen(nodeID, nodeVersions[nodeID], s);
+ } finally {
+ nodes[nodeID].mgr.release(s);
+ }
+ }
+
+ changeIndicesThread = new ChangeIndices();
+ changeIndicesThread.start();
+ }
+
+ protected void finish() throws InterruptedException, IOException {
+ changeIndicesThread.join();
+ for(NodeState node : nodes) {
+ node.close();
+ }
+ }
+
+ protected static class SearcherAndVersion {
+ public final IndexSearcher searcher;
+ public final long version;
+
+ public SearcherAndVersion(IndexSearcher searcher, long version) {
+ this.searcher = searcher;
+ this.version = version;
+ }
+ }
+}
Property changes on: lucene/src/test-framework/java/org/apache/lucene/search/ShardSearchingTestBase.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native