| Index: lucene/src/java/org/apache/lucene/index/MergePolicy.java |
| =================================================================== |
| --- lucene/src/java/org/apache/lucene/index/MergePolicy.java (revision 1104081) |
| +++ lucene/src/java/org/apache/lucene/index/MergePolicy.java (working copy) |
| @@ -72,7 +72,7 @@ |
| long mergeGen; // used by IndexWriter |
| boolean isExternal; // used by IndexWriter |
| int maxNumSegmentsOptimize; // used by IndexWriter |
| - long estimatedMergeBytes; // used by IndexWriter |
| + public long estimatedMergeBytes; // used by IndexWriter |
| List<SegmentReader> readers; // used by IndexWriter |
| List<SegmentReader> readerClones; // used by IndexWriter |
| public final List<SegmentInfo> segments; |
| Index: lucene/contrib/CHANGES.txt |
| =================================================================== |
| --- lucene/contrib/CHANGES.txt (revision 1104081) |
| +++ lucene/contrib/CHANGES.txt (working copy) |
| @@ -85,6 +85,11 @@ |
| it cannot group by function queries nor arbitrary queries. (Mike |
| McCandless) |
| |
| + * LUCENE-3092: Added NRTCachingDirectory in contrib/misc, which |
| + caches small segments in RAM. This is useful, in near-real-time |
| + case where the indexing rate is lowish but the reopen rate is |
| + highish, to take load off the IO system. (Mike McCandless) |
| + |
| Optimizations |
| |
| * LUCENE-3040: Switch all analysis consumers (highlighter, morelikethis, memory, ...) |
| Index: lucene/contrib/misc/src/test/org/apache/lucene/store/TestNRTCachingDirectory.java |
| =================================================================== |
| --- lucene/contrib/misc/src/test/org/apache/lucene/store/TestNRTCachingDirectory.java (revision 0) |
| +++ lucene/contrib/misc/src/test/org/apache/lucene/store/TestNRTCachingDirectory.java (revision 0) |
| @@ -0,0 +1,120 @@ |
| +package org.apache.lucene.store; |
| + |
| +/** |
| + * Licensed to the Apache Software Foundation (ASF) under one or more |
| + * contributor license agreements. See the NOTICE file distributed with |
| + * this work for additional information regarding copyright ownership. |
| + * The ASF licenses this file to You under the Apache License, Version 2.0 |
| + * (the "License"); you may not use this file except in compliance with |
| + * the License. You may obtain a copy of the License at |
| + * |
| + * http://www.apache.org/licenses/LICENSE-2.0 |
| + * |
| + * Unless required by applicable law or agreed to in writing, software |
| + * distributed under the License is distributed on an "AS IS" BASIS, |
| + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| + * See the License for the specific language governing permissions and |
| + * limitations under the License. |
| + */ |
| + |
| +import java.io.File; |
| +import java.io.IOException; |
| +import java.util.ArrayList; |
| +import java.util.List; |
| +import java.util.Set; |
| + |
| +import org.apache.lucene.analysis.Analyzer; |
| +import org.apache.lucene.analysis.MockAnalyzer; |
| +import org.apache.lucene.document.Document; |
| +import org.apache.lucene.index.CorruptIndexException; |
| +import org.apache.lucene.index.IndexReader; |
| +import org.apache.lucene.index.IndexWriter; |
| +import org.apache.lucene.index.IndexWriterConfig; |
| +import org.apache.lucene.index.MergePolicy; |
| +import org.apache.lucene.index.RandomIndexWriter; |
| +import org.apache.lucene.index.SegmentInfo; |
| +import org.apache.lucene.index.SegmentInfos; |
| +import org.apache.lucene.index.Term; |
| +import org.apache.lucene.search.IndexSearcher; |
| +import org.apache.lucene.search.TermQuery; |
| +import org.apache.lucene.search.TopDocs; |
| +import org.apache.lucene.util.BytesRef; |
| +import org.apache.lucene.util.LineFileDocs; |
| +import org.apache.lucene.util.LuceneTestCase; |
| +import org.apache.lucene.util.Version; |
| +import org.apache.lucene.util._TestUtil; |
| + |
| +public class TestNRTCachingDirectory extends LuceneTestCase { |
| + |
| + public void testNRTAndCommit() throws Exception { |
| + Directory dir = newDirectory(); |
| + NRTCachingDirectory cachedDir = new NRTCachingDirectory(dir, 2.0, 25.0); |
| + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random)); |
| + conf.setMergeScheduler(cachedDir.getMergeScheduler()); |
| + RandomIndexWriter w = new RandomIndexWriter(random, cachedDir, conf); |
| + w.w.setInfoStream(VERBOSE ? System.out : null); |
| + final LineFileDocs docs = new LineFileDocs(random); |
| + final int numDocs = _TestUtil.nextInt(random, 100, 400); |
| + |
| + if (VERBOSE) { |
| + System.out.println("TEST: numDocs=" + numDocs); |
| + } |
| + |
| + final List<BytesRef> ids = new ArrayList<BytesRef>(); |
| + IndexReader r = null; |
| + for(int docCount=0;docCount<numDocs;docCount++) { |
| + final Document doc = docs.nextDoc(); |
| + ids.add(new BytesRef(doc.get("docid"))); |
| + w.addDocument(doc); |
| + if (random.nextInt(20) == 17) { |
| + if (r == null) { |
| + r = IndexReader.open(w.w, false); |
| + } else { |
| + final IndexReader r2 = r.reopen(); |
| + if (r2 != r) { |
| + r.close(); |
| + r = r2; |
| + } |
| + } |
| + assertEquals(1+docCount, r.numDocs()); |
| + final IndexSearcher s = new IndexSearcher(r); |
| + // Just make sure search can run; we can't assert |
| + // totHits since it could be 0 |
| + TopDocs hits = s.search(new TermQuery(new Term("body", "the")), 10); |
| + // System.out.println("tot hits " + hits.totalHits); |
| + } |
| + } |
| + |
| + if (r != null) { |
| + r.close(); |
| + } |
| + |
| + // Close should force cache to clear since all files are sync'd |
| + w.close(); |
| + |
| + final String[] cachedFiles = cachedDir.listCachedFiles(); |
| + for(String file : cachedFiles) { |
| + System.out.println("FAIL: cached file " + file + " remains after sync"); |
| + } |
| + assertEquals(0, cachedFiles.length); |
| + |
| + r = IndexReader.open(dir); |
| + for(BytesRef id : ids) { |
| + assertEquals(1, r.docFreq("docid", id)); |
| + } |
| + r.close(); |
| + cachedDir.close(); |
| + } |
| + |
| + // NOTE: not a test; just here to make sure the code frag |
| + // in the javadocs is correct! |
| + public void verifyCompiles() throws Exception { |
| + Analyzer analyzer = null; |
| + |
| + Directory fsDir = FSDirectory.open(new File("/path/to/index")); |
| + NRTCachingDirectory cachedFSDir = new NRTCachingDirectory(fsDir, 2.0, 25.0); |
| + IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_32, analyzer); |
| + conf.setMergeScheduler(cachedFSDir.getMergeScheduler()); |
| + IndexWriter writer = new IndexWriter(cachedFSDir, conf); |
| + } |
| +} |
| |
| Property changes on: lucene/contrib/misc/src/test/org/apache/lucene/store/TestNRTCachingDirectory.java |
| ___________________________________________________________________ |
| Added: svn:eol-style |
| + native |
| |
| Index: lucene/contrib/misc/src/java/org/apache/lucene/store/NRTCachingDirectory.java |
| =================================================================== |
| --- lucene/contrib/misc/src/java/org/apache/lucene/store/NRTCachingDirectory.java (revision 0) |
| +++ lucene/contrib/misc/src/java/org/apache/lucene/store/NRTCachingDirectory.java (revision 0) |
| @@ -0,0 +1,288 @@ |
| +package org.apache.lucene.store; |
| + |
| +/** |
| + * Licensed to the Apache Software Foundation (ASF) under one or more |
| + * contributor license agreements. See the NOTICE file distributed with |
| + * this work for additional information regarding copyright ownership. |
| + * The ASF licenses this file to You under the Apache License, Version 2.0 |
| + * (the "License"); you may not use this file except in compliance with |
| + * the License. You may obtain a copy of the License at |
| + * |
| + * http://www.apache.org/licenses/LICENSE-2.0 |
| + * |
| + * Unless required by applicable law or agreed to in writing, software |
| + * distributed under the License is distributed on an "AS IS" BASIS, |
| + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| + * See the License for the specific language governing permissions and |
| + * limitations under the License. |
| + */ |
| + |
| +import java.io.IOException; |
| +import java.util.Collection; |
| +import java.util.HashSet; |
| +import java.util.Set; |
| +import java.util.concurrent.ConcurrentHashMap; |
| + |
| +import org.apache.lucene.index.ConcurrentMergeScheduler; |
| +import org.apache.lucene.index.IndexFileNames; |
| +import org.apache.lucene.index.IndexWriter; // javadocs |
| +import org.apache.lucene.index.MergePolicy; |
| +import org.apache.lucene.index.MergeScheduler; |
| +import org.apache.lucene.util.IOUtils; |
| + |
| +// TODO |
| +// - let subclass dictate policy...? |
| +// - rename to MergeCacheingDir? NRTCachingDir |
| + |
| +/** |
| + * Wraps a {@link RAMDirectory} |
| + * around any provided delegate directory, to |
| + * be used during NRT search. Make sure you pull the merge |
| + * scheduler using {@link #getMergeScheduler} and pass that to your |
| + * {@link IndexWriter}; this class uses that to keep track of which |
| + * merges are being done by which threads, to decide when to |
| + * cache each written file. |
| + * |
| + * <p>This class is likely only useful in a near-real-time |
| + * context, where indexing rate is lowish but reopen |
| + * rate is highish, resulting in many tiny files being |
| + * written. This directory keeps such segments (as well as |
| + * the segments produced by merging them, as long as they |
| + * are small enough), in RAM.</p> |
| + * |
| + * <p>This is safe to use: when your app calls {IndexWriter#commit}, |
| + * all cached files will be flushed from the cached and sync'd.</p> |
| + * |
| + * <p><b>NOTE</b>: this class is somewhat sneaky in its |
| + * approach for spying on merges to determine the size of a |
| + * merge: it records which threads are running which merges |
| + * by watching ConcurrentMergeScheduler's doMerge method. |
| + * While this works correctly, likely future versions of |
| + * this class will take a more general approach. |
| + * |
| + * <p>Here's a simple example usage: |
| + * |
| + * <pre> |
| + * Directory fsDir = FSDirectory.open(new File("/path/to/index")); |
| + * NRTCachingDirectory cachedFSDir = new NRTCachingDirectory(fsDir, 5.0, 60.0); |
| + * IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_32, analyzer); |
| + * conf.setMergeScheduler(cachedFSDir.getMergeScheduler()); |
| + * IndexWriter writer = new IndexWriter(cachedFSDir, conf); |
| + * </pre> |
| + * |
| + * <p>This will cache all newly flushed segments, all merges |
| + * whose expected segment size is <= 5 MB, unless the net |
| + * cached bytes exceeds 60 MB at which point all writes will |
| + * not be cached (until the net bytes falls below 60 MB).</p> |
| + * |
| + * @lucene.experimental |
| + */ |
| + |
| +public class NRTCachingDirectory extends Directory { |
| + |
| + private final RAMDirectory cache = new RAMDirectory(); |
| + |
| + private final Directory delegate; |
| + |
| + private final long maxMergeSizeBytes; |
| + private final long maxCachedBytes; |
| + |
| + private static final boolean VERBOSE = false; |
| + |
| + /** |
| + * We will cache a newly created output if 1) it's a |
| + * flush or a merge and the estimated size of the merged segmnt is <= |
| + * maxMergeSizeMB, and 2) the total cached bytes is <= |
| + * maxCachedMB */ |
| + public NRTCachingDirectory(Directory delegate, double maxMergeSizeMB, double maxCachedMB) { |
| + this.delegate = delegate; |
| + maxMergeSizeBytes = (long) (maxMergeSizeMB*1024*1024); |
| + maxCachedBytes = (long) (maxCachedMB*1024*1024); |
| + } |
| + |
| + @Override |
| + public synchronized String[] listAll() throws IOException { |
| + final Set<String> files = new HashSet<String>(); |
| + for(String f : cache.listAll()) { |
| + files.add(f); |
| + } |
| + for(String f : delegate.listAll()) { |
| + assert !files.contains(f); |
| + files.add(f); |
| + } |
| + return files.toArray(new String[files.size()]); |
| + } |
| + |
| + /** Returns how many bytes are being used by the |
| + * RAMDirectory cache */ |
| + public long sizeInBytes() { |
| + return cache.sizeInBytes(); |
| + } |
| + |
| + @Override |
| + public synchronized boolean fileExists(String name) throws IOException { |
| + return cache.fileExists(name) || delegate.fileExists(name); |
| + } |
| + |
| + @Override |
| + public synchronized long fileModified(String name) throws IOException { |
| + if (cache.fileExists(name)) { |
| + return cache.fileModified(name); |
| + } else { |
| + return delegate.fileModified(name); |
| + } |
| + } |
| + |
| + @Override |
| + public synchronized void touchFile(String name) throws IOException { |
| + if (cache.fileExists(name)) { |
| + cache.touchFile(name); |
| + } else { |
| + delegate.touchFile(name); |
| + } |
| + } |
| + |
| + @Override |
| + public synchronized void deleteFile(String name) throws IOException { |
| + // Delete from both, in case we are currently uncaching: |
| + if (VERBOSE) { |
| + System.out.println("nrtdir.deleteFile name=" + name); |
| + } |
| + cache.deleteFile(name); |
| + delegate.deleteFile(name); |
| + } |
| + |
| + @Override |
| + public synchronized long fileLength(String name) throws IOException { |
| + if (cache.fileExists(name)) { |
| + return cache.fileLength(name); |
| + } else { |
| + return delegate.fileLength(name); |
| + } |
| + } |
| + |
| + public String[] listCachedFiles() { |
| + return cache.listAll(); |
| + } |
| + |
| + @Override |
| + public IndexOutput createOutput(String name) throws IOException { |
| + if (VERBOSE) { |
| + System.out.println("nrtdir.createOutput name=" + name); |
| + } |
| + if (doCacheWrite(name)) { |
| + if (VERBOSE) { |
| + System.out.println(" to cache"); |
| + } |
| + return cache.createOutput(name); |
| + } else { |
| + return delegate.createOutput(name); |
| + } |
| + } |
| + |
| + @Override |
| + public void sync(Collection<String> fileNames) throws IOException { |
| + if (VERBOSE) { |
| + System.out.println("nrtdir.sync files=" + fileNames); |
| + } |
| + for(String fileName : fileNames) { |
| + unCache(fileName); |
| + } |
| + delegate.sync(fileNames); |
| + } |
| + |
| + @Override |
| + public synchronized IndexInput openInput(String name) throws IOException { |
| + if (VERBOSE) { |
| + System.out.println("nrtdir.openInput name=" + name); |
| + } |
| + if (cache.fileExists(name)) { |
| + if (VERBOSE) { |
| + System.out.println(" from cache"); |
| + } |
| + return cache.openInput(name); |
| + } else { |
| + return delegate.openInput(name); |
| + } |
| + } |
| + |
| + @Override |
| + public synchronized IndexInput openInput(String name, int bufferSize) throws IOException { |
| + if (cache.fileExists(name)) { |
| + return cache.openInput(name, bufferSize); |
| + } else { |
| + return delegate.openInput(name, bufferSize); |
| + } |
| + } |
| + |
| + @Override |
| + public Lock makeLock(String name) { |
| + return delegate.makeLock(name); |
| + } |
| + |
| + @Override |
| + public void clearLock(String name) throws IOException { |
| + delegate.clearLock(name); |
| + } |
| + |
| + /** Close thius directory, which flushes any cached files |
| + * to the delegate and then closes the delegate. */ |
| + @Override |
| + public void close() throws IOException { |
| + for(String fileName : cache.listAll()) { |
| + unCache(fileName); |
| + } |
| + cache.close(); |
| + delegate.close(); |
| + } |
| + |
| + private final ConcurrentHashMap<Thread,MergePolicy.OneMerge> merges = new ConcurrentHashMap<Thread,MergePolicy.OneMerge>(); |
| + |
| + public MergeScheduler getMergeScheduler() { |
| + return new ConcurrentMergeScheduler() { |
| + @Override |
| + protected void doMerge(MergePolicy.OneMerge merge) throws IOException { |
| + try { |
| + merges.put(Thread.currentThread(), merge); |
| + super.doMerge(merge); |
| + } finally { |
| + merges.remove(Thread.currentThread()); |
| + } |
| + } |
| + }; |
| + } |
| + |
| + /** Subclass can override this to customize logic; return |
| + * true if this file should be written to the RAMDirectory. */ |
| + protected boolean doCacheWrite(String name) { |
| + final MergePolicy.OneMerge merge = merges.get(Thread.currentThread()); |
| + //System.out.println(Thread.currentThread().getName() + ": CACHE check merge=" + merge + " size=" + (merge==null ? 0 : merge.estimatedMergeBytes)); |
| + return !name.equals(IndexFileNames.SEGMENTS_GEN) && (merge == null || merge.estimatedMergeBytes <= maxMergeSizeBytes) && cache.sizeInBytes() <= maxCachedBytes; |
| + } |
| + |
| + private void unCache(String fileName) throws IOException { |
| + final IndexOutput out; |
| + synchronized(this) { |
| + if (!delegate.fileExists(fileName)) { |
| + assert cache.fileExists(fileName); |
| + out = delegate.createOutput(fileName); |
| + } else { |
| + out = null; |
| + } |
| + } |
| + |
| + if (out != null) { |
| + IndexInput in = null; |
| + try { |
| + in = cache.openInput(fileName); |
| + in.copyBytes(out, in.length()); |
| + } finally { |
| + IOUtils.closeSafely(in, out); |
| + } |
| + synchronized(this) { |
| + cache.deleteFile(fileName); |
| + } |
| + } |
| + } |
| +} |
| + |
| |
| Property changes on: lucene/contrib/misc/src/java/org/apache/lucene/store/NRTCachingDirectory.java |
| ___________________________________________________________________ |
| Added: svn:eol-style |
| + native |
| |