blob: 8a750a5eac5fc294ad35f9c3ebadbef53a77d474 [file] [log] [blame]
Index: lucene/CHANGES.txt
===================================================================
--- lucene/CHANGES.txt (revision 1139215)
+++ lucene/CHANGES.txt (working copy)
@@ -438,6 +438,10 @@
IndexSearcher. SortFields can have SortField.REWRITEABLE type which
requires they are rewritten before they are used. (Chris Male)
+* LUCENE-3203: FSDirectory can now limit the max allowed write rate
+ (MB/sec) of all running merges, to reduce impact ongoing merging has
+ on searching, NRT reopen time, etc. (Mike McCandless)
+
Optimizations
* LUCENE-2588: Don't store unnecessary suffixes when writing the terms
Index: lucene/src/java/org/apache/lucene/store/RateLimiter.java
===================================================================
--- lucene/src/java/org/apache/lucene/store/RateLimiter.java (revision 0)
+++ lucene/src/java/org/apache/lucene/store/RateLimiter.java (revision 0)
@@ -0,0 +1,77 @@
+package org.apache.lucene.store;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.util.ThreadInterruptedException;
+
+/** Simple class to rate limit IO. Typically it's shared
+ * across multiple IndexInputs or IndexOutputs (for example
+ * those involved all merging). Those IndexInputs and
+ * IndexOutputs would call {@link #pause} whenever they
+ * want to read bytes or write bytes. */
+
+public class RateLimiter {
+ private volatile double nsPerByte;
+ private volatile long lastNS;
+
+ // TODO: we could also allow eg a sub class to dynamically
+ // determine the allowed rate, eg if an app wants to
+ // change the allowed rate over time or something
+
+ /** mbPerSec is the MB/sec max IO rate */
+ public RateLimiter(double mbPerSec) {
+ setMaxRate(mbPerSec);
+ }
+
+ public void setMaxRate(double mbPerSec) {
+ nsPerByte = 1000000000. / (1024*1024*mbPerSec);
+ }
+
+ /** Pauses, if necessary, to keep the instantaneous IO
+ * rate at or below the target. NOTE: multiple threads
+ * may safely use this, however the implementation is
+ * not perfectly thread safe but likely in practice this
+ * is harmless (just means in some rate cases the rate
+ * might exceed the target). */
+ public void pause(long bytes) {
+
+ // TODO: this is purely instantenous rate; maybe we
+ // should also offer decayed recent history one?
+ final long targetNS = lastNS = lastNS + ((long) (bytes * nsPerByte));
+ long curNS = System.nanoTime();
+ if (lastNS < curNS) {
+ lastNS = curNS;
+ }
+
+ // While loop because Thread.sleep doesn't alway sleep
+ // enough:
+ while(true) {
+ final long pauseNS = targetNS - curNS;
+ if (pauseNS > 0) {
+ try {
+ Thread.sleep((int) (pauseNS/1000000), (int) (pauseNS % 1000000));
+ } catch (InterruptedException ie) {
+ throw new ThreadInterruptedException(ie);
+ }
+ curNS = System.nanoTime();
+ continue;
+ }
+ break;
+ }
+ }
+}
Property changes on: lucene/src/java/org/apache/lucene/store/RateLimiter.java
___________________________________________________________________
Added: svn:eol-style
+ native
Index: lucene/src/java/org/apache/lucene/store/FSDirectory.java
===================================================================
--- lucene/src/java/org/apache/lucene/store/FSDirectory.java (revision 1139215)
+++ lucene/src/java/org/apache/lucene/store/FSDirectory.java (working copy)
@@ -123,6 +123,10 @@
protected final Set<String> staleFiles = synchronizedSet(new HashSet<String>()); // Files written, but not yet sync'ed
private int chunkSize = DEFAULT_READ_CHUNK_SIZE; // LUCENE-1566
+ // null means no limite
+ private Double maxMergeWriteMBPerSec;
+ private RateLimiter mergeWriteRateLimiter;
+
// returns the canonical version of the directory, creating it if it doesn't exist.
private static File getCanonicalPath(File file) throws IOException {
return new File(file.getCanonicalPath());
@@ -291,9 +295,38 @@
ensureOpen();
ensureCanWrite(name);
- return new FSIndexOutput(this, name);
+ return new FSIndexOutput(this, name, context.context == IOContext.Context.MERGE ? mergeWriteRateLimiter : null);
}
+ /** Sets the maximum (approx) MB/sec allowed by all IO
+ * performed by merging. Pass null to have no limit.
+ *
+ * <p><b>NOTE</b>: if merges are already running there is
+ * no guarantee this rate will apply to them; it will only
+ * apply for certain to new merges.
+ *
+ * @lucene.experimental */
+ public synchronized void setMaxMergeWriteMBPerSec(Double mbPerSec) {
+ maxMergeWriteMBPerSec = mbPerSec;
+ if (mbPerSec == null) {
+ if (mergeWriteRateLimiter != null) {
+ mergeWriteRateLimiter.setMaxRate(Double.MAX_VALUE);
+ mergeWriteRateLimiter = null;
+ }
+ } else if (mergeWriteRateLimiter != null) {
+ mergeWriteRateLimiter.setMaxRate(mbPerSec);
+ } else {
+ mergeWriteRateLimiter = new RateLimiter(mbPerSec);
+ }
+ }
+
+ /** See {@link #setMaxMergeWriteMBPerSec}.
+ *
+ * @lucene.experimental */
+ public Double getMaxMergeWriteMBPerSec() {
+ return maxMergeWriteMBPerSec;
+ }
+
protected void ensureCanWrite(String name) throws IOException {
if (!directory.exists())
if (!directory.mkdirs())
@@ -403,17 +436,22 @@
private final String name;
private final RandomAccessFile file;
private volatile boolean isOpen; // remember if the file is open, so that we don't try to close it more than once
-
- public FSIndexOutput(FSDirectory parent, String name) throws IOException {
+ private final RateLimiter rateLimiter;
+
+ public FSIndexOutput(FSDirectory parent, String name, RateLimiter rateLimiter) throws IOException {
this.parent = parent;
this.name = name;
file = new RandomAccessFile(new File(parent.directory, name), "rw");
isOpen = true;
+ this.rateLimiter = rateLimiter;
}
/** output methods: */
@Override
public void flushBuffer(byte[] b, int offset, int size) throws IOException {
+ if (rateLimiter != null) {
+ rateLimiter.pause(size);
+ }
file.write(b, offset, size);
}