blob: 66d1572ea6c4904c48bcb37ff4573215b77ed80e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.store;
import java.io.IOException;
import org.apache.lucene.util.ThreadInterruptedException;
/**
* Abstract base class to rate limit IO. Typically implementations are shared across multiple
* IndexInputs or IndexOutputs (for example those involved all merging). Those IndexInputs and
* IndexOutputs would call {@link #pause} whenever the have read or written more than {@link
* #getMinPauseCheckBytes} bytes.
*/
public abstract class RateLimiter {
/**
* Sets an updated MB per second rate limit. A subclass is allowed to perform dynamic updates of
* the rate limit during use.
*/
public abstract void setMBPerSec(double mbPerSec);
/** The current MB per second rate limit. */
public abstract double getMBPerSec();
/**
* Pauses, if necessary, to keep the instantaneous IO rate at or below the target.
*
* <p>Note: the implementation is thread-safe
*
* @return the pause time in nano seconds
*/
public abstract long pause(long bytes) throws IOException;
/**
* How many bytes caller should add up itself before invoking {@link #pause}. NOTE: The value
* returned by this method may change over time and is not guaranteed to be constant throughout
* the lifetime of the RateLimiter. Users are advised to refresh their local values with calls to
* this method to ensure consistency.
*/
public abstract long getMinPauseCheckBytes();
/** Simple class to rate limit IO. */
public static class SimpleRateLimiter extends RateLimiter {
private static final int MIN_PAUSE_CHECK_MSEC = 5;
private volatile double mbPerSec;
private volatile long minPauseCheckBytes;
private long lastNS;
/** mbPerSec is the MB/sec max IO rate */
public SimpleRateLimiter(double mbPerSec) {
setMBPerSec(mbPerSec);
lastNS = System.nanoTime();
}
/** Sets an updated mb per second rate limit. */
@Override
public void setMBPerSec(double mbPerSec) {
this.mbPerSec = mbPerSec;
minPauseCheckBytes = (long) ((MIN_PAUSE_CHECK_MSEC / 1000.0) * mbPerSec * 1024 * 1024);
}
@Override
public long getMinPauseCheckBytes() {
return minPauseCheckBytes;
}
/** The current mb per second rate limit. */
@Override
public double getMBPerSec() {
return this.mbPerSec;
}
/**
* Pauses, if necessary, to keep the instantaneous IO rate at or below the target. Be sure to
* only call this method when bytes &gt; {@link #getMinPauseCheckBytes}, otherwise it will pause
* way too long!
*
* @return the pause time in nano seconds
*/
@Override
public long pause(long bytes) {
long startNS = System.nanoTime();
double secondsToPause = (bytes / 1024. / 1024.) / mbPerSec;
long targetNS;
// Sync'd to read + write lastNS:
synchronized (this) {
// Time we should sleep until; this is purely instantaneous
// rate (just adds seconds onto the last time we had paused to);
// maybe we should also offer decayed recent history one?
targetNS = lastNS + (long) (1000000000 * secondsToPause);
if (startNS >= targetNS) {
// OK, current time is already beyond the target sleep time,
// no pausing to do.
// Set to startNS, not targetNS, to enforce the instant rate, not
// the "averaaged over all history" rate:
lastNS = startNS;
return 0;
}
lastNS = targetNS;
}
long curNS = startNS;
// While loop because Thread.sleep doesn't always sleep
// enough:
while (true) {
final long pauseNS = targetNS - curNS;
if (pauseNS > 0) {
try {
// NOTE: except maybe on real-time JVMs, minimum realistic sleep time
// is 1 msec; if you pass just 1 nsec the default impl rounds
// this up to 1 msec:
int sleepNS;
int sleepMS;
if (pauseNS > 100000L * Integer.MAX_VALUE) {
// Not really practical (sleeping for 25 days) but we shouldn't overflow int:
sleepMS = Integer.MAX_VALUE;
sleepNS = 0;
} else {
sleepMS = (int) (pauseNS / 1000000);
sleepNS = (int) (pauseNS % 1000000);
}
Thread.sleep(sleepMS, sleepNS);
} catch (InterruptedException ie) {
throw new ThreadInterruptedException(ie);
}
curNS = System.nanoTime();
continue;
}
break;
}
return curNS - startNS;
}
}
}