| Index: lucene/CHANGES.txt |
| =================================================================== |
| --- lucene/CHANGES.txt (revision 1139215) |
| +++ lucene/CHANGES.txt (working copy) |
| @@ -438,6 +438,10 @@ |
| IndexSearcher. SortFields can have SortField.REWRITEABLE type which |
| requires they are rewritten before they are used. (Chris Male) |
| |
| +* LUCENE-3203: FSDirectory can now limit the max allowed write rate |
| + (MB/sec) of all running merges, to reduce impact ongoing merging has |
| + on searching, NRT reopen time, etc. (Mike McCandless) |
| + |
| Optimizations |
| |
| * LUCENE-2588: Don't store unnecessary suffixes when writing the terms |
| Index: lucene/src/java/org/apache/lucene/store/RateLimiter.java |
| =================================================================== |
| --- lucene/src/java/org/apache/lucene/store/RateLimiter.java (revision 0) |
| +++ lucene/src/java/org/apache/lucene/store/RateLimiter.java (revision 0) |
| @@ -0,0 +1,77 @@ |
| +package org.apache.lucene.store; |
| + |
| +/** |
| + * Licensed to the Apache Software Foundation (ASF) under one or more |
| + * contributor license agreements. See the NOTICE file distributed with |
| + * this work for additional information regarding copyright ownership. |
| + * The ASF licenses this file to You under the Apache License, Version 2.0 |
| + * (the "License"); you may not use this file except in compliance with |
| + * the License. You may obtain a copy of the License at |
| + * |
| + * http://www.apache.org/licenses/LICENSE-2.0 |
| + * |
| + * Unless required by applicable law or agreed to in writing, software |
| + * distributed under the License is distributed on an "AS IS" BASIS, |
| + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| + * See the License for the specific language governing permissions and |
| + * limitations under the License. |
| + */ |
| + |
| +import org.apache.lucene.util.ThreadInterruptedException; |
| + |
| +/** Simple class to rate limit IO. Typically it's shared |
| + * across multiple IndexInputs or IndexOutputs (for example |
| + * those involved all merging). Those IndexInputs and |
| + * IndexOutputs would call {@link #pause} whenever they |
| + * want to read bytes or write bytes. */ |
| + |
| +public class RateLimiter { |
| + private volatile double nsPerByte; |
| + private volatile long lastNS; |
| + |
| + // TODO: we could also allow eg a sub class to dynamically |
| + // determine the allowed rate, eg if an app wants to |
| + // change the allowed rate over time or something |
| + |
| + /** mbPerSec is the MB/sec max IO rate */ |
| + public RateLimiter(double mbPerSec) { |
| + setMaxRate(mbPerSec); |
| + } |
| + |
| + public void setMaxRate(double mbPerSec) { |
| + nsPerByte = 1000000000. / (1024*1024*mbPerSec); |
| + } |
| + |
| + /** Pauses, if necessary, to keep the instantaneous IO |
| + * rate at or below the target. NOTE: multiple threads |
| + * may safely use this, however the implementation is |
| + * not perfectly thread safe but likely in practice this |
| + * is harmless (just means in some rate cases the rate |
| + * might exceed the target). */ |
| + public void pause(long bytes) { |
| + |
| + // TODO: this is purely instantenous rate; maybe we |
| + // should also offer decayed recent history one? |
| + final long targetNS = lastNS = lastNS + ((long) (bytes * nsPerByte)); |
| + long curNS = System.nanoTime(); |
| + if (lastNS < curNS) { |
| + lastNS = curNS; |
| + } |
| + |
| + // While loop because Thread.sleep doesn't alway sleep |
| + // enough: |
| + while(true) { |
| + final long pauseNS = targetNS - curNS; |
| + if (pauseNS > 0) { |
| + try { |
| + Thread.sleep((int) (pauseNS/1000000), (int) (pauseNS % 1000000)); |
| + } catch (InterruptedException ie) { |
| + throw new ThreadInterruptedException(ie); |
| + } |
| + curNS = System.nanoTime(); |
| + continue; |
| + } |
| + break; |
| + } |
| + } |
| +} |
| |
| Property changes on: lucene/src/java/org/apache/lucene/store/RateLimiter.java |
| ___________________________________________________________________ |
| Added: svn:eol-style |
| + native |
| |
| Index: lucene/src/java/org/apache/lucene/store/FSDirectory.java |
| =================================================================== |
| --- lucene/src/java/org/apache/lucene/store/FSDirectory.java (revision 1139215) |
| +++ lucene/src/java/org/apache/lucene/store/FSDirectory.java (working copy) |
| @@ -123,6 +123,10 @@ |
| protected final Set<String> staleFiles = synchronizedSet(new HashSet<String>()); // Files written, but not yet sync'ed |
| private int chunkSize = DEFAULT_READ_CHUNK_SIZE; // LUCENE-1566 |
| |
| + // null means no limite |
| + private Double maxMergeWriteMBPerSec; |
| + private RateLimiter mergeWriteRateLimiter; |
| + |
| // returns the canonical version of the directory, creating it if it doesn't exist. |
| private static File getCanonicalPath(File file) throws IOException { |
| return new File(file.getCanonicalPath()); |
| @@ -291,9 +295,38 @@ |
| ensureOpen(); |
| |
| ensureCanWrite(name); |
| - return new FSIndexOutput(this, name); |
| + return new FSIndexOutput(this, name, context.context == IOContext.Context.MERGE ? mergeWriteRateLimiter : null); |
| } |
| |
| + /** Sets the maximum (approx) MB/sec allowed by all IO |
| + * performed by merging. Pass null to have no limit. |
| + * |
| + * <p><b>NOTE</b>: if merges are already running there is |
| + * no guarantee this rate will apply to them; it will only |
| + * apply for certain to new merges. |
| + * |
| + * @lucene.experimental */ |
| + public synchronized void setMaxMergeWriteMBPerSec(Double mbPerSec) { |
| + maxMergeWriteMBPerSec = mbPerSec; |
| + if (mbPerSec == null) { |
| + if (mergeWriteRateLimiter != null) { |
| + mergeWriteRateLimiter.setMaxRate(Double.MAX_VALUE); |
| + mergeWriteRateLimiter = null; |
| + } |
| + } else if (mergeWriteRateLimiter != null) { |
| + mergeWriteRateLimiter.setMaxRate(mbPerSec); |
| + } else { |
| + mergeWriteRateLimiter = new RateLimiter(mbPerSec); |
| + } |
| + } |
| + |
| + /** See {@link #setMaxMergeWriteMBPerSec}. |
| + * |
| + * @lucene.experimental */ |
| + public Double getMaxMergeWriteMBPerSec() { |
| + return maxMergeWriteMBPerSec; |
| + } |
| + |
| protected void ensureCanWrite(String name) throws IOException { |
| if (!directory.exists()) |
| if (!directory.mkdirs()) |
| @@ -403,17 +436,22 @@ |
| private final String name; |
| private final RandomAccessFile file; |
| private volatile boolean isOpen; // remember if the file is open, so that we don't try to close it more than once |
| - |
| - public FSIndexOutput(FSDirectory parent, String name) throws IOException { |
| + private final RateLimiter rateLimiter; |
| + |
| + public FSIndexOutput(FSDirectory parent, String name, RateLimiter rateLimiter) throws IOException { |
| this.parent = parent; |
| this.name = name; |
| file = new RandomAccessFile(new File(parent.directory, name), "rw"); |
| isOpen = true; |
| + this.rateLimiter = rateLimiter; |
| } |
| |
| /** output methods: */ |
| @Override |
| public void flushBuffer(byte[] b, int offset, int size) throws IOException { |
| + if (rateLimiter != null) { |
| + rateLimiter.pause(size); |
| + } |
| file.write(b, offset, size); |
| } |
| |