blob: 4a87dfb2c9d1456ad0719225fd1bafcdafacd36f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import kafka.server.Constants._
import kafka.server.ReplicationQuotaManagerConfig._
import kafka.utils.CoreUtils._
import kafka.utils.Logging
import org.apache.kafka.common.metrics._
import java.util.concurrent.locks.ReentrantReadWriteLock
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.metrics.stats.SimpleRate
import org.apache.kafka.common.utils.Time
/**
* Configuration settings for quota management
*
* @param quotaBytesPerSecondDefault The default bytes per second quota allocated to internal replication
* @param numQuotaSamples The number of samples to retain in memory
* @param quotaWindowSizeSeconds The time span of each sample
*
*/
case class ReplicationQuotaManagerConfig(quotaBytesPerSecondDefault: Long = QuotaBytesPerSecondDefault,
numQuotaSamples: Int = DefaultNumQuotaSamples,
quotaWindowSizeSeconds: Int = DefaultQuotaWindowSizeSeconds)
object ReplicationQuotaManagerConfig {
val QuotaBytesPerSecondDefault = Long.MaxValue
// Always have 10 whole windows + 1 current window
val DefaultNumQuotaSamples = 11
val DefaultQuotaWindowSizeSeconds = 1
// Purge sensors after 1 hour of inactivity
val InactiveSensorExpirationTimeSeconds = 3600
}
trait ReplicaQuota {
def isThrottled(topicPartition: TopicPartition): Boolean
def isQuotaExceeded(): Boolean
}
object Constants {
val AllReplicas = Seq[Int](-1)
}
/**
* Tracks replication metrics and comparing them to any quotas for throttled partitions.
*
* @param config The quota configs
* @param metrics The Metrics instance
* @param replicationType The name / key for this quota manager, typically leader or follower
* @param time Time object to use
*/
class ReplicationQuotaManager(val config: ReplicationQuotaManagerConfig,
private val metrics: Metrics,
private val replicationType: QuotaType,
private val time: Time) extends Logging with ReplicaQuota {
private val lock = new ReentrantReadWriteLock()
private val throttledPartitions = new ConcurrentHashMap[String, Seq[Int]]()
private var quota: Quota = null
private val sensorAccess = new SensorAccess
private val rateMetricName = metrics.metricName("byte-rate", replicationType.toString, s"Tracking byte-rate for ${replicationType}")
/**
* Update the quota
*
* @param quota
*/
def updateQuota(quota: Quota) {
inWriteLock(lock) {
this.quota = quota
//The metric could be expired by another thread, so use a local variable and null check.
val metric = metrics.metrics.get(rateMetricName)
if (metric != null) {
metric.config(getQuotaMetricConfig(quota))
}
}
}
/**
* Check if the quota is currently exceeded
*
* @return
*/
override def isQuotaExceeded(): Boolean = {
try {
sensor().checkQuotas()
} catch {
case qve: QuotaViolationException =>
trace("%s: Quota violated for sensor (%s), metric: (%s), metric-value: (%f), bound: (%f)".format(replicationType, sensor().name(), qve.metricName, qve.value, qve.bound))
return true
}
false
}
/**
* Is the passed partition throttled by this ReplicationQuotaManager
*
* @param topicPartition the partition to check
* @return
*/
override def isThrottled(topicPartition: TopicPartition): Boolean = {
val partitions = throttledPartitions.get(topicPartition.topic)
if (partitions != null)
(partitions eq AllReplicas) || partitions.contains(topicPartition.partition)
else false
}
/**
* Add the passed value to the throttled rate. This method ignores the quota with
* the value being added to the rate even if the quota is exceeded
*
* @param value
*/
def record(value: Long) {
try {
sensor().record(value)
} catch {
case qve: QuotaViolationException =>
trace(s"Record: Quota violated, but ignored, for sensor (${sensor.name}), metric: (${qve.metricName}), value : (${qve.value}), bound: (${qve.bound}), recordedValue ($value)")
}
}
/**
* Update the set of throttled partitions for this QuotaManager. The partitions passed, for
* any single topic, will replace any previous
*
* @param topic
* @param partitions the set of throttled partitions
* @return
*/
def markThrottled(topic: String, partitions: Seq[Int]) {
throttledPartitions.put(topic, partitions)
}
/**
* Mark all replicas for this topic as throttled
*
* @param topic
* @return
*/
def markThrottled(topic: String) {
markThrottled(topic, AllReplicas)
}
/**
* Remove list of throttled replicas for a certain topic
*
* @param topic
* @return
*/
def removeThrottle(topic: String) {
throttledPartitions.remove(topic)
}
/**
* Returns the bound of the configured quota
*
* @return
*/
def upperBound(): Long = {
inReadLock(lock) {
if (quota != null)
quota.bound().toLong
else
Long.MaxValue
}
}
private def getQuotaMetricConfig(quota: Quota): MetricConfig = {
new MetricConfig()
.timeWindow(config.quotaWindowSizeSeconds, TimeUnit.SECONDS)
.samples(config.numQuotaSamples)
.quota(quota)
}
private def sensor(): Sensor = {
sensorAccess.getOrCreate(
replicationType.toString,
InactiveSensorExpirationTimeSeconds,
lock,
metrics,
() => rateMetricName,
() => getQuotaMetricConfig(quota),
() => new SimpleRate()
)
}
}