blob: d1a144d7882919426824799ff8e8a47f89c83158 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.utils;
import kafka.metrics.KafkaMetricsGroup
import java.util.concurrent.TimeUnit
import java.util.Random
import scala.math._
/**
* A class to measure and throttle the rate of some process. The throttler takes a desired rate-per-second
* (the units of the process don't matter, it could be bytes or a count of some other thing), and will sleep for
* an appropriate amount of time when maybeThrottle() is called to attain the desired rate.
*
* @param desiredRatePerSec: The rate we want to hit in units/sec
* @param checkIntervalMs: The interval at which to check our rate
* @param throttleDown: Does throttling increase or decrease our rate?
* @param time: The time implementation to use
*/
@threadsafe
class Throttler(val desiredRatePerSec: Double,
val checkIntervalMs: Long = 100L,
val throttleDown: Boolean = true,
metricName: String = "throttler",
units: String = "entries",
val time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
private val lock = new Object
private val meter = newMeter(metricName, units, TimeUnit.SECONDS)
private var periodStartNs: Long = time.nanoseconds
private var observedSoFar: Double = 0.0
def maybeThrottle(observed: Double) {
meter.mark(observed.toLong)
lock synchronized {
observedSoFar += observed
val now = time.nanoseconds
val elapsedNs = now - periodStartNs
// if we have completed an interval AND we have observed something, maybe
// we should take a little nap
if(elapsedNs > checkIntervalMs * Time.NsPerMs && observedSoFar > 0) {
val rateInSecs = (observedSoFar * Time.NsPerSec) / elapsedNs
val needAdjustment = !(throttleDown ^ (rateInSecs > desiredRatePerSec))
if(needAdjustment) {
// solve for the amount of time to sleep to make us hit the desired rate
val desiredRateMs = desiredRatePerSec / Time.MsPerSec.toDouble
val elapsedMs = elapsedNs / Time.NsPerMs
val sleepTime = round(observedSoFar / desiredRateMs - elapsedMs)
if(sleepTime > 0) {
trace("Natural rate is %f per second but desired rate is %f, sleeping for %d ms to compensate.".format(rateInSecs, desiredRatePerSec, sleepTime))
time.sleep(sleepTime)
}
}
periodStartNs = now
observedSoFar = 0
}
}
}
}
object Throttler {
def main(args: Array[String]) {
val rand = new Random()
val throttler = new Throttler(100000, 100, true, time = SystemTime)
val interval = 30000
var start = System.currentTimeMillis
var total = 0
while(true) {
val value = rand.nextInt(1000)
Thread.sleep(1)
throttler.maybeThrottle(value)
total += value
val now = System.currentTimeMillis
if(now - start >= interval) {
println(total / (interval/1000.0))
start = now
total = 0
}
}
}
}