blob: ee0caace69c6dd070a21519beeac275a1c9d0d82 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.search;
import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.TermStates;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.LineFileDocs;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.PrintStreamInfoStream;
import org.apache.lucene.util.TestUtil;
// TODO
// - doc blocks? so we can test joins/grouping...
// - controlled consistency (NRTMgr)
/**
* Base test class for simulating distributed search across multiple shards.
*/
public abstract class ShardSearchingTestBase extends LuceneTestCase {
// TODO: maybe SLM should throw this instead of returning null...
/**
* Thrown when the lease for a searcher has expired.
*/
public static class SearcherExpiredException extends RuntimeException {
public SearcherExpiredException(String message) {
super(message);
}
}
private static class FieldAndShardVersion {
private final long version;
private final int nodeID;
private final String field;
public FieldAndShardVersion(int nodeID, long version, String field) {
this.nodeID = nodeID;
this.version = version;
this.field = field;
}
@Override
public int hashCode() {
return (int) (version * nodeID + field.hashCode());
}
@Override
public boolean equals(Object _other) {
if (!(_other instanceof FieldAndShardVersion)) {
return false;
}
final FieldAndShardVersion other = (FieldAndShardVersion) _other;
return field.equals(other.field) && version == other.version && nodeID == other.nodeID;
}
@Override
public String toString() {
return "FieldAndShardVersion(field=" + field + " nodeID=" + nodeID + " version=" + version+ ")";
}
}
private static class TermAndShardVersion {
private final long version;
private final int nodeID;
private final Term term;
public TermAndShardVersion(int nodeID, long version, Term term) {
this.nodeID = nodeID;
this.version = version;
this.term = term;
}
@Override
public int hashCode() {
return (int) (version * nodeID + term.hashCode());
}
@Override
public boolean equals(Object _other) {
if (!(_other instanceof TermAndShardVersion)) {
return false;
}
final TermAndShardVersion other = (TermAndShardVersion) _other;
return term.equals(other.term) && version == other.version && nodeID == other.nodeID;
}
}
// We share collection stats for these fields on each node
// reopen:
private final String[] fieldsToShare = new String[] {"body", "title"};
// Called by one node once it has reopened, to notify all
// other nodes. This is just a mock (since it goes and
// directly updates all other nodes, in RAM)... in a real
// env this would hit the wire, sending version &
// collection stats to all other nodes:
void broadcastNodeReopen(int nodeID, long version, IndexSearcher newSearcher) throws IOException {
if (VERBOSE) {
System.out.println("REOPEN: nodeID=" + nodeID + " version=" + version + " maxDoc=" + newSearcher.getIndexReader().maxDoc());
}
// Broadcast new collection stats for this node to all
// other nodes:
for(String field : fieldsToShare) {
final CollectionStatistics stats = newSearcher.collectionStatistics(field);
if (stats != null) {
for (NodeState node : nodes) {
// Don't put my own collection stats into the cache;
// we pull locally:
if (node.myNodeID != nodeID) {
node.collectionStatsCache.put(new FieldAndShardVersion(nodeID, version, field), stats);
}
}
}
}
for (NodeState node : nodes) {
node.updateNodeVersion(nodeID, version);
}
}
// TODO: broadcastNodeExpire? then we can purge the
// known-stale cache entries...
// MOCK: in a real env you have to hit the wire
// (send this query to all remote nodes
// concurrently):
TopDocs searchNode(int nodeID, long[] nodeVersions, Query q, Sort sort, int numHits, ScoreDoc searchAfter) throws IOException {
final NodeState.ShardIndexSearcher s = nodes[nodeID].acquire(nodeVersions);
try {
if (sort == null) {
if (searchAfter != null) {
return s.localSearchAfter(searchAfter, q, numHits);
} else {
return s.localSearch(q, numHits);
}
} else {
assert searchAfter == null; // not supported yet
return s.localSearch(q, numHits, sort);
}
} finally {
nodes[nodeID].release(s);
}
}
// Mock: in a real env, this would hit the wire and get
// term stats from remote node
Map<Term,TermStatistics> getNodeTermStats(Set<Term> terms, int nodeID, long version) throws IOException {
final NodeState node = nodes[nodeID];
final Map<Term,TermStatistics> stats = new HashMap<>();
final IndexSearcher s = node.searchers.acquire(version);
if (s == null) {
throw new SearcherExpiredException("node=" + nodeID + " version=" + version);
}
try {
for(Term term : terms) {
final TermStates ts = TermStates.build(s.getIndexReader().getContext(), term, true);
if (ts.docFreq() > 0) {
stats.put(term, s.termStatistics(term, ts.docFreq(), ts.totalTermFreq()));
}
}
} finally {
node.searchers.release(s);
}
return stats;
}
protected final class NodeState implements Closeable {
public final Directory dir;
public final IndexWriter writer;
public final SearcherLifetimeManager searchers;
public final SearcherManager mgr;
public final int myNodeID;
public final long[] currentNodeVersions;
// TODO: nothing evicts from here!!! Somehow, on searcher
// expiration on remote nodes we must evict from our
// local cache...? And still LRU otherwise (for the
// still-live searchers).
private final Map<FieldAndShardVersion,CollectionStatistics> collectionStatsCache = new ConcurrentHashMap<>();
private final Map<TermAndShardVersion,TermStatistics> termStatsCache = new ConcurrentHashMap<>();
/** Matches docs in the local shard but scores based on
* aggregated stats ("mock distributed scoring") from all
* nodes. */
public class ShardIndexSearcher extends IndexSearcher {
// Version for the node searchers we search:
public final long[] nodeVersions;
public final int myNodeID;
public ShardIndexSearcher(long[] nodeVersions, IndexReader localReader, int nodeID) {
super(localReader);
this.nodeVersions = nodeVersions;
myNodeID = nodeID;
assert myNodeID == NodeState.this.myNodeID: "myNodeID=" + nodeID + " NodeState.this.myNodeID=" + NodeState.this.myNodeID;
}
@Override
public Query rewrite(Query original) throws IOException {
final IndexSearcher localSearcher = new IndexSearcher(getIndexReader());
original = localSearcher.rewrite(original);
final Set<Term> terms = new HashSet<>();
original.visit(QueryVisitor.termCollector(terms));
// Make a single request to remote nodes for term
// stats:
for(int nodeID=0;nodeID<nodeVersions.length;nodeID++) {
if (nodeID == myNodeID) {
continue;
}
final Set<Term> missing = new HashSet<>();
for(Term term : terms) {
final TermAndShardVersion key = new TermAndShardVersion(nodeID, nodeVersions[nodeID], term);
if (!termStatsCache.containsKey(key)) {
missing.add(term);
}
}
if (missing.size() != 0) {
for(Map.Entry<Term,TermStatistics> ent : getNodeTermStats(missing, nodeID, nodeVersions[nodeID]).entrySet()) {
if (ent.getValue() != null) {
final TermAndShardVersion key = new TermAndShardVersion(nodeID, nodeVersions[nodeID], ent.getKey());
termStatsCache.put(key, ent.getValue());
}
}
}
}
return original;
}
@Override
public TermStatistics termStatistics(Term term, int docFreq, long totalTermFreq) throws IOException {
assert term != null;
long distributedDocFreq = 0;
long distributedTotalTermFreq = 0;
for(int nodeID=0;nodeID<nodeVersions.length;nodeID++) {
final TermStatistics subStats;
if (nodeID == myNodeID) {
subStats = super.termStatistics(term, docFreq, totalTermFreq);
} else {
final TermAndShardVersion key = new TermAndShardVersion(nodeID, nodeVersions[nodeID], term);
subStats = termStatsCache.get(key);
if (subStats == null) {
continue; // term not found
}
}
long nodeDocFreq = subStats.docFreq();
distributedDocFreq += nodeDocFreq;
long nodeTotalTermFreq = subStats.totalTermFreq();
distributedTotalTermFreq += nodeTotalTermFreq;
}
assert distributedDocFreq > 0;
return new TermStatistics(term.bytes(), distributedDocFreq, distributedTotalTermFreq);
}
@Override
public CollectionStatistics collectionStatistics(String field) throws IOException {
// TODO: we could compute this on init and cache,
// since we are re-inited whenever any nodes have a
// new reader
long docCount = 0;
long sumTotalTermFreq = 0;
long sumDocFreq = 0;
long maxDoc = 0;
for(int nodeID=0;nodeID<nodeVersions.length;nodeID++) {
final FieldAndShardVersion key = new FieldAndShardVersion(nodeID, nodeVersions[nodeID], field);
final CollectionStatistics nodeStats;
if (nodeID == myNodeID) {
nodeStats = super.collectionStatistics(field);
} else {
nodeStats = collectionStatsCache.get(key);
}
if (nodeStats == null) {
continue; // field not in sub at all
}
long nodeDocCount = nodeStats.docCount();
docCount += nodeDocCount;
long nodeSumTotalTermFreq = nodeStats.sumTotalTermFreq();
sumTotalTermFreq += nodeSumTotalTermFreq;
long nodeSumDocFreq = nodeStats.sumDocFreq();
sumDocFreq += nodeSumDocFreq;
assert nodeStats.maxDoc() >= 0;
maxDoc += nodeStats.maxDoc();
}
if (maxDoc == 0) {
return null; // field not found across any node whatsoever
} else {
return new CollectionStatistics(field, maxDoc, docCount, sumTotalTermFreq, sumDocFreq);
}
}
@Override
public TopDocs search(Query query, int numHits) throws IOException {
final TopDocs[] shardHits = new TopDocs[nodeVersions.length];
for(int nodeID=0;nodeID<nodeVersions.length;nodeID++) {
if (nodeID == myNodeID) {
// My node; run using local shard searcher we
// already aquired:
shardHits[nodeID] = localSearch(query, numHits);
} else {
shardHits[nodeID] = searchNode(nodeID, nodeVersions, query, null, numHits, null);
}
}
// Merge:
return TopDocs.merge(numHits, shardHits);
}
public TopDocs localSearch(Query query, int numHits) throws IOException {
return super.search(query, numHits);
}
@Override
public TopDocs searchAfter(ScoreDoc after, Query query, int numHits) throws IOException {
if (after == null) {
return super.searchAfter(after, query, numHits);
}
final TopDocs[] shardHits = new TopDocs[nodeVersions.length];
// results are merged in that order: score, shardIndex, doc. therefore we set
// after to after.score and depending on the nodeID we set doc to either:
// - not collect any more documents with that score (only with worse score)
// - collect more documents with that score (and worse) following the last collected document
// - collect all documents with that score (and worse)
ScoreDoc shardAfter = new ScoreDoc(after.doc, after.score);
for (int nodeID = 0; nodeID < nodeVersions.length; nodeID++) {
if (nodeID < after.shardIndex) {
// all documents with after.score were already collected, so collect
// only documents with worse scores.
final NodeState.ShardIndexSearcher s = nodes[nodeID].acquire(nodeVersions);
try {
// Setting after.doc to reader.maxDoc-1 is a way to tell
// TopScoreDocCollector that no more docs with that score should
// be collected. note that in practice the shard which sends the
// request to a remote shard won't have reader.maxDoc at hand, so
// it will send some arbitrary value which will be fixed on the
// other end.
shardAfter.doc = s.getIndexReader().maxDoc() - 1;
} finally {
nodes[nodeID].release(s);
}
} else if (nodeID == after.shardIndex) {
// collect all documents following the last collected doc with
// after.score + documents with worse scores.
shardAfter.doc = after.doc;
} else {
// all documents with after.score (and worse) should be collected
// because they didn't make it to top-N in the previous round.
shardAfter.doc = -1;
}
if (nodeID == myNodeID) {
// My node; run using local shard searcher we
// already aquired:
shardHits[nodeID] = localSearchAfter(shardAfter, query, numHits);
} else {
shardHits[nodeID] = searchNode(nodeID, nodeVersions, query, null, numHits, shardAfter);
}
//System.out.println(" node=" + nodeID + " totHits=" + shardHits[nodeID].totalHits);
}
// Merge:
return TopDocs.merge(numHits, shardHits);
}
public TopDocs localSearchAfter(ScoreDoc after, Query query, int numHits) throws IOException {
return super.searchAfter(after, query, numHits);
}
@Override
public TopFieldDocs search(Query query, int numHits, Sort sort) throws IOException {
assert sort != null;
final TopFieldDocs[] shardHits = new TopFieldDocs[nodeVersions.length];
for(int nodeID=0;nodeID<nodeVersions.length;nodeID++) {
if (nodeID == myNodeID) {
// My node; run using local shard searcher we
// already aquired:
shardHits[nodeID] = localSearch(query, numHits, sort);
} else {
shardHits[nodeID] = (TopFieldDocs) searchNode(nodeID, nodeVersions, query, sort, numHits, null);
}
}
// Merge:
return TopDocs.merge(sort, numHits, shardHits);
}
public TopFieldDocs localSearch(Query query, int numHits, Sort sort) throws IOException {
return super.search(query, numHits, sort);
}
}
private volatile ShardIndexSearcher currentShardSearcher;
public NodeState(Random random, int nodeID, int numNodes) throws IOException {
myNodeID = nodeID;
dir = newFSDirectory(createTempDir("ShardSearchingTestBase"));
// TODO: set warmer
MockAnalyzer analyzer = new MockAnalyzer(random());
analyzer.setMaxTokenLength(TestUtil.nextInt(random(), 1, IndexWriter.MAX_TERM_LENGTH));
IndexWriterConfig iwc = new IndexWriterConfig(analyzer);
iwc.setOpenMode(IndexWriterConfig.OpenMode.CREATE);
if (VERBOSE) {
iwc.setInfoStream(new PrintStreamInfoStream(System.out));
}
writer = new IndexWriter(dir, iwc);
mgr = new SearcherManager(writer, null);
searchers = new SearcherLifetimeManager();
// Init w/ 0s... caller above will do initial
// "broadcast" by calling initSearcher:
currentNodeVersions = new long[numNodes];
}
public void initSearcher(long[] nodeVersions) throws IOException {
assert currentShardSearcher == null;
System.arraycopy(nodeVersions, 0, currentNodeVersions, 0, currentNodeVersions.length);
currentShardSearcher = new ShardIndexSearcher(currentNodeVersions.clone(),
mgr.acquire().getIndexReader(),
myNodeID);
}
public void updateNodeVersion(int nodeID, long version) throws IOException {
currentNodeVersions[nodeID] = version;
if (currentShardSearcher != null) {
currentShardSearcher.getIndexReader().decRef();
}
currentShardSearcher = new ShardIndexSearcher(currentNodeVersions.clone(),
mgr.acquire().getIndexReader(),
myNodeID);
}
// Get the current (fresh) searcher for this node
public ShardIndexSearcher acquire() {
while(true) {
final ShardIndexSearcher s = currentShardSearcher;
// In theory the reader could get decRef'd to 0
// before we have a chance to incRef, ie if a reopen
// happens right after the above line, this thread
// gets stalled, and the old IR is closed. So we
// must try/retry until incRef succeeds:
if (s.getIndexReader().tryIncRef()) {
return s;
}
}
}
public void release(ShardIndexSearcher s) throws IOException {
s.getIndexReader().decRef();
}
// Get and old searcher matching the specified versions:
public ShardIndexSearcher acquire(long[] nodeVersions) {
final IndexSearcher s = searchers.acquire(nodeVersions[myNodeID]);
if (s == null) {
throw new SearcherExpiredException("nodeID=" + myNodeID + " version=" + nodeVersions[myNodeID]);
}
return new ShardIndexSearcher(nodeVersions, s.getIndexReader(), myNodeID);
}
// Reopen local reader
public void reopen() throws IOException {
final IndexSearcher before = mgr.acquire();
mgr.release(before);
mgr.maybeRefresh();
final IndexSearcher after = mgr.acquire();
try {
if (after != before) {
// New searcher was opened
final long version = searchers.record(after);
searchers.prune(new SearcherLifetimeManager.PruneByAge(maxSearcherAgeSeconds));
broadcastNodeReopen(myNodeID, version, after);
}
} finally {
mgr.release(after);
}
}
@Override
public void close() throws IOException {
if (currentShardSearcher != null) {
currentShardSearcher.getIndexReader().decRef();
}
searchers.close();
mgr.close();
writer.close();
dir.close();
}
}
// TODO: make this more realistic, ie, each node should
// have its own thread, so we have true node to node
// concurrency
private final class ChangeIndices extends Thread {
@Override
public void run() {
try (final LineFileDocs docs = new LineFileDocs(random())) {
int numDocs = 0;
while (System.nanoTime() < endTimeNanos) {
final int what = random().nextInt(3);
final NodeState node = nodes[random().nextInt(nodes.length)];
if (numDocs == 0 || what == 0) {
node.writer.addDocument(docs.nextDoc());
numDocs++;
} else if (what == 1) {
node.writer.updateDocument(new Term("docid", ""+random().nextInt(numDocs)),
docs.nextDoc());
numDocs++;
} else {
node.writer.deleteDocuments(new Term("docid", ""+random().nextInt(numDocs)));
}
// TODO: doc blocks too
if (random().nextInt(17) == 12) {
node.writer.commit();
}
if (random().nextInt(17) == 12) {
nodes[random().nextInt(nodes.length)].reopen();
}
}
} catch (Throwable t) {
System.out.println("FAILED:");
t.printStackTrace(System.out);
throw new RuntimeException(t);
}
}
}
protected NodeState[] nodes;
int maxSearcherAgeSeconds;
long endTimeNanos;
private Thread changeIndicesThread;
protected void start(int numNodes, double runTimeSec, int maxSearcherAgeSeconds) throws IOException {
endTimeNanos = System.nanoTime() + (long) (runTimeSec*1000000000);
this.maxSearcherAgeSeconds = maxSearcherAgeSeconds;
nodes = new NodeState[numNodes];
for(int nodeID=0;nodeID<numNodes;nodeID++) {
nodes[nodeID] = new NodeState(random(), nodeID, numNodes);
}
long[] nodeVersions = new long[nodes.length];
for(int nodeID=0;nodeID<numNodes;nodeID++) {
final IndexSearcher s = nodes[nodeID].mgr.acquire();
try {
nodeVersions[nodeID] = nodes[nodeID].searchers.record(s);
} finally {
nodes[nodeID].mgr.release(s);
}
}
for(int nodeID=0;nodeID<numNodes;nodeID++) {
final IndexSearcher s = nodes[nodeID].mgr.acquire();
assert nodeVersions[nodeID] == nodes[nodeID].searchers.record(s);
assert s != null;
try {
broadcastNodeReopen(nodeID, nodeVersions[nodeID], s);
} finally {
nodes[nodeID].mgr.release(s);
}
}
changeIndicesThread = new ChangeIndices();
changeIndicesThread.start();
}
protected void finish() throws InterruptedException, IOException {
changeIndicesThread.join();
for(NodeState node : nodes) {
node.close();
}
}
/**
* An IndexSearcher and associated version (lease)
*/
protected static class SearcherAndVersion {
public final IndexSearcher searcher;
public final long version;
public SearcherAndVersion(IndexSearcher searcher, long version) {
this.searcher = searcher;
this.version = version;
}
}
}