blob: db96117125118812bbd2e0b6af991ee7898e3605 [file] [log] [blame]
Index: lucene/src/java/org/apache/lucene/index/
--- lucene/src/java/org/apache/lucene/index/ (revision 1104081)
+++ lucene/src/java/org/apache/lucene/index/ (working copy)
@@ -72,7 +72,7 @@
long mergeGen; // used by IndexWriter
boolean isExternal; // used by IndexWriter
int maxNumSegmentsOptimize; // used by IndexWriter
- long estimatedMergeBytes; // used by IndexWriter
+ public long estimatedMergeBytes; // used by IndexWriter
List<SegmentReader> readers; // used by IndexWriter
List<SegmentReader> readerClones; // used by IndexWriter
public final List<SegmentInfo> segments;
Index: lucene/contrib/CHANGES.txt
--- lucene/contrib/CHANGES.txt (revision 1104081)
+++ lucene/contrib/CHANGES.txt (working copy)
@@ -85,6 +85,11 @@
it cannot group by function queries nor arbitrary queries. (Mike
+ * LUCENE-3092: Added NRTCachingDirectory in contrib/misc, which
+ caches small segments in RAM. This is useful, in near-real-time
+ case where the indexing rate is lowish but the reopen rate is
+ highish, to take load off the IO system. (Mike McCandless)
* LUCENE-3040: Switch all analysis consumers (highlighter, morelikethis, memory, ...)
Index: lucene/contrib/misc/src/test/org/apache/lucene/store/
--- lucene/contrib/misc/src/test/org/apache/lucene/store/ (revision 0)
+++ lucene/contrib/misc/src/test/org/apache/lucene/store/ (revision 0)
@@ -0,0 +1,120 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.MockAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.MergePolicy;
+import org.apache.lucene.index.RandomIndexWriter;
+import org.apache.lucene.index.SegmentInfo;
+import org.apache.lucene.index.SegmentInfos;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.LineFileDocs;
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.lucene.util.Version;
+import org.apache.lucene.util._TestUtil;
+public class TestNRTCachingDirectory extends LuceneTestCase {
+ public void testNRTAndCommit() throws Exception {
+ Directory dir = newDirectory();
+ NRTCachingDirectory cachedDir = new NRTCachingDirectory(dir, 2.0, 25.0);
+ IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random));
+ conf.setMergeScheduler(cachedDir.getMergeScheduler());
+ RandomIndexWriter w = new RandomIndexWriter(random, cachedDir, conf);
+ w.w.setInfoStream(VERBOSE ? System.out : null);
+ final LineFileDocs docs = new LineFileDocs(random);
+ final int numDocs = _TestUtil.nextInt(random, 100, 400);
+ if (VERBOSE) {
+ System.out.println("TEST: numDocs=" + numDocs);
+ }
+ final List<BytesRef> ids = new ArrayList<BytesRef>();
+ IndexReader r = null;
+ for(int docCount=0;docCount<numDocs;docCount++) {
+ final Document doc = docs.nextDoc();
+ ids.add(new BytesRef(doc.get("docid")));
+ w.addDocument(doc);
+ if (random.nextInt(20) == 17) {
+ if (r == null) {
+ r =, false);
+ } else {
+ final IndexReader r2 = r.reopen();
+ if (r2 != r) {
+ r.close();
+ r = r2;
+ }
+ }
+ assertEquals(1+docCount, r.numDocs());
+ final IndexSearcher s = new IndexSearcher(r);
+ // Just make sure search can run; we can't assert
+ // totHits since it could be 0
+ TopDocs hits = TermQuery(new Term("body", "the")), 10);
+ // System.out.println("tot hits " + hits.totalHits);
+ }
+ }
+ if (r != null) {
+ r.close();
+ }
+ // Close should force cache to clear since all files are sync'd
+ w.close();
+ final String[] cachedFiles = cachedDir.listCachedFiles();
+ for(String file : cachedFiles) {
+ System.out.println("FAIL: cached file " + file + " remains after sync");
+ }
+ assertEquals(0, cachedFiles.length);
+ r =;
+ for(BytesRef id : ids) {
+ assertEquals(1, r.docFreq("docid", id));
+ }
+ r.close();
+ cachedDir.close();
+ }
+ // NOTE: not a test; just here to make sure the code frag
+ // in the javadocs is correct!
+ public void verifyCompiles() throws Exception {
+ Analyzer analyzer = null;
+ Directory fsDir = File("/path/to/index"));
+ NRTCachingDirectory cachedFSDir = new NRTCachingDirectory(fsDir, 2.0, 25.0);
+ IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_32, analyzer);
+ conf.setMergeScheduler(cachedFSDir.getMergeScheduler());
+ IndexWriter writer = new IndexWriter(cachedFSDir, conf);
+ }
Property changes on: lucene/contrib/misc/src/test/org/apache/lucene/store/
Added: svn:eol-style
+ native
Index: lucene/contrib/misc/src/java/org/apache/lucene/store/
--- lucene/contrib/misc/src/java/org/apache/lucene/store/ (revision 0)
+++ lucene/contrib/misc/src/java/org/apache/lucene/store/ (revision 0)
@@ -0,0 +1,288 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.lucene.index.ConcurrentMergeScheduler;
+import org.apache.lucene.index.IndexFileNames;
+import org.apache.lucene.index.IndexWriter; // javadocs
+import org.apache.lucene.index.MergePolicy;
+import org.apache.lucene.index.MergeScheduler;
+import org.apache.lucene.util.IOUtils;
+// TODO
+// - let subclass dictate policy...?
+// - rename to MergeCacheingDir? NRTCachingDir
+ * Wraps a {@link RAMDirectory}
+ * around any provided delegate directory, to
+ * be used during NRT search. Make sure you pull the merge
+ * scheduler using {@link #getMergeScheduler} and pass that to your
+ * {@link IndexWriter}; this class uses that to keep track of which
+ * merges are being done by which threads, to decide when to
+ * cache each written file.
+ *
+ * <p>This class is likely only useful in a near-real-time
+ * context, where indexing rate is lowish but reopen
+ * rate is highish, resulting in many tiny files being
+ * written. This directory keeps such segments (as well as
+ * the segments produced by merging them, as long as they
+ * are small enough), in RAM.</p>
+ *
+ * <p>This is safe to use: when your app calls {IndexWriter#commit},
+ * all cached files will be flushed from the cached and sync'd.</p>
+ *
+ * <p><b>NOTE</b>: this class is somewhat sneaky in its
+ * approach for spying on merges to determine the size of a
+ * merge: it records which threads are running which merges
+ * by watching ConcurrentMergeScheduler's doMerge method.
+ * While this works correctly, likely future versions of
+ * this class will take a more general approach.
+ *
+ * <p>Here's a simple example usage:
+ *
+ * <pre>
+ * Directory fsDir = File("/path/to/index"));
+ * NRTCachingDirectory cachedFSDir = new NRTCachingDirectory(fsDir, 5.0, 60.0);
+ * IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_32, analyzer);
+ * conf.setMergeScheduler(cachedFSDir.getMergeScheduler());
+ * IndexWriter writer = new IndexWriter(cachedFSDir, conf);
+ * </pre>
+ *
+ * <p>This will cache all newly flushed segments, all merges
+ * whose expected segment size is <= 5 MB, unless the net
+ * cached bytes exceeds 60 MB at which point all writes will
+ * not be cached (until the net bytes falls below 60 MB).</p>
+ *
+ * @lucene.experimental
+ */
+public class NRTCachingDirectory extends Directory {
+ private final RAMDirectory cache = new RAMDirectory();
+ private final Directory delegate;
+ private final long maxMergeSizeBytes;
+ private final long maxCachedBytes;
+ private static final boolean VERBOSE = false;
+ /**
+ * We will cache a newly created output if 1) it's a
+ * flush or a merge and the estimated size of the merged segmnt is <=
+ * maxMergeSizeMB, and 2) the total cached bytes is <=
+ * maxCachedMB */
+ public NRTCachingDirectory(Directory delegate, double maxMergeSizeMB, double maxCachedMB) {
+ this.delegate = delegate;
+ maxMergeSizeBytes = (long) (maxMergeSizeMB*1024*1024);
+ maxCachedBytes = (long) (maxCachedMB*1024*1024);
+ }
+ @Override
+ public synchronized String[] listAll() throws IOException {
+ final Set<String> files = new HashSet<String>();
+ for(String f : cache.listAll()) {
+ files.add(f);
+ }
+ for(String f : delegate.listAll()) {
+ assert !files.contains(f);
+ files.add(f);
+ }
+ return files.toArray(new String[files.size()]);
+ }
+ /** Returns how many bytes are being used by the
+ * RAMDirectory cache */
+ public long sizeInBytes() {
+ return cache.sizeInBytes();
+ }
+ @Override
+ public synchronized boolean fileExists(String name) throws IOException {
+ return cache.fileExists(name) || delegate.fileExists(name);
+ }
+ @Override
+ public synchronized long fileModified(String name) throws IOException {
+ if (cache.fileExists(name)) {
+ return cache.fileModified(name);
+ } else {
+ return delegate.fileModified(name);
+ }
+ }
+ @Override
+ public synchronized void touchFile(String name) throws IOException {
+ if (cache.fileExists(name)) {
+ cache.touchFile(name);
+ } else {
+ delegate.touchFile(name);
+ }
+ }
+ @Override
+ public synchronized void deleteFile(String name) throws IOException {
+ // Delete from both, in case we are currently uncaching:
+ if (VERBOSE) {
+ System.out.println("nrtdir.deleteFile name=" + name);
+ }
+ cache.deleteFile(name);
+ delegate.deleteFile(name);
+ }
+ @Override
+ public synchronized long fileLength(String name) throws IOException {
+ if (cache.fileExists(name)) {
+ return cache.fileLength(name);
+ } else {
+ return delegate.fileLength(name);
+ }
+ }
+ public String[] listCachedFiles() {
+ return cache.listAll();
+ }
+ @Override
+ public IndexOutput createOutput(String name) throws IOException {
+ if (VERBOSE) {
+ System.out.println("nrtdir.createOutput name=" + name);
+ }
+ if (doCacheWrite(name)) {
+ if (VERBOSE) {
+ System.out.println(" to cache");
+ }
+ return cache.createOutput(name);
+ } else {
+ return delegate.createOutput(name);
+ }
+ }
+ @Override
+ public void sync(Collection<String> fileNames) throws IOException {
+ if (VERBOSE) {
+ System.out.println("nrtdir.sync files=" + fileNames);
+ }
+ for(String fileName : fileNames) {
+ unCache(fileName);
+ }
+ delegate.sync(fileNames);
+ }
+ @Override
+ public synchronized IndexInput openInput(String name) throws IOException {
+ if (VERBOSE) {
+ System.out.println("nrtdir.openInput name=" + name);
+ }
+ if (cache.fileExists(name)) {
+ if (VERBOSE) {
+ System.out.println(" from cache");
+ }
+ return cache.openInput(name);
+ } else {
+ return delegate.openInput(name);
+ }
+ }
+ @Override
+ public synchronized IndexInput openInput(String name, int bufferSize) throws IOException {
+ if (cache.fileExists(name)) {
+ return cache.openInput(name, bufferSize);
+ } else {
+ return delegate.openInput(name, bufferSize);
+ }
+ }
+ @Override
+ public Lock makeLock(String name) {
+ return delegate.makeLock(name);
+ }
+ @Override
+ public void clearLock(String name) throws IOException {
+ delegate.clearLock(name);
+ }
+ /** Close thius directory, which flushes any cached files
+ * to the delegate and then closes the delegate. */
+ @Override
+ public void close() throws IOException {
+ for(String fileName : cache.listAll()) {
+ unCache(fileName);
+ }
+ cache.close();
+ delegate.close();
+ }
+ private final ConcurrentHashMap<Thread,MergePolicy.OneMerge> merges = new ConcurrentHashMap<Thread,MergePolicy.OneMerge>();
+ public MergeScheduler getMergeScheduler() {
+ return new ConcurrentMergeScheduler() {
+ @Override
+ protected void doMerge(MergePolicy.OneMerge merge) throws IOException {
+ try {
+ merges.put(Thread.currentThread(), merge);
+ super.doMerge(merge);
+ } finally {
+ merges.remove(Thread.currentThread());
+ }
+ }
+ };
+ }
+ /** Subclass can override this to customize logic; return
+ * true if this file should be written to the RAMDirectory. */
+ protected boolean doCacheWrite(String name) {
+ final MergePolicy.OneMerge merge = merges.get(Thread.currentThread());
+ //System.out.println(Thread.currentThread().getName() + ": CACHE check merge=" + merge + " size=" + (merge==null ? 0 : merge.estimatedMergeBytes));
+ return !name.equals(IndexFileNames.SEGMENTS_GEN) && (merge == null || merge.estimatedMergeBytes <= maxMergeSizeBytes) && cache.sizeInBytes() <= maxCachedBytes;
+ }
+ private void unCache(String fileName) throws IOException {
+ final IndexOutput out;
+ synchronized(this) {
+ if (!delegate.fileExists(fileName)) {
+ assert cache.fileExists(fileName);
+ out = delegate.createOutput(fileName);
+ } else {
+ out = null;
+ }
+ }
+ if (out != null) {
+ IndexInput in = null;
+ try {
+ in = cache.openInput(fileName);
+ in.copyBytes(out, in.length());
+ } finally {
+ IOUtils.closeSafely(in, out);
+ }
+ synchronized(this) {
+ cache.deleteFile(fileName);
+ }
+ }
+ }
Property changes on: lucene/contrib/misc/src/java/org/apache/lucene/store/
Added: svn:eol-style
+ native