blob: f27dff3142f0b48076da25e63521ea0a7f8f3e49 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import java.util.concurrent.TimeUnit
import com.yammer.metrics.core.Meter
import kafka.metrics.KafkaMetricsGroup
import kafka.utils.Pool
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import scala.collection._
case class ProducePartitionStatus(requiredOffset: Long, responseStatus: PartitionResponse) {
@volatile var acksPending = false
override def toString = "[acksPending: %b, error: %d, startOffset: %d, requiredOffset: %d]"
.format(acksPending, responseStatus.error.code, responseStatus.baseOffset, requiredOffset)
}
/**
* The produce metadata maintained by the delayed produce operation
*/
case class ProduceMetadata(produceRequiredAcks: Short,
produceStatus: Map[TopicPartition, ProducePartitionStatus]) {
override def toString = "[requiredAcks: %d, partitionStatus: %s]"
.format(produceRequiredAcks, produceStatus)
}
/**
* A delayed produce operation that can be created by the replica manager and watched
* in the produce operation purgatory
*/
class DelayedProduce(delayMs: Long,
produceMetadata: ProduceMetadata,
replicaManager: ReplicaManager,
responseCallback: Map[TopicPartition, PartitionResponse] => Unit)
extends DelayedOperation(delayMs) {
// first update the acks pending variable according to the error code
produceMetadata.produceStatus.foreach { case (topicPartition, status) =>
if (status.responseStatus.error == Errors.NONE) {
// Timeout error state will be cleared when required acks are received
status.acksPending = true
status.responseStatus.error = Errors.REQUEST_TIMED_OUT
} else {
status.acksPending = false
}
trace("Initial partition status for %s is %s".format(topicPartition, status))
}
/**
* The delayed produce operation can be completed if every partition
* it produces to is satisfied by one of the following:
*
* Case A: This broker is no longer the leader: set an error in response
* Case B: This broker is the leader:
* B.1 - If there was a local error thrown while checking if at least requiredAcks
* replicas have caught up to this operation: set an error in response
* B.2 - Otherwise, set the response with no error.
*/
override def tryComplete(): Boolean = {
// check for each partition if it still has pending acks
produceMetadata.produceStatus.foreach { case (topicPartition, status) =>
trace(s"Checking produce satisfaction for ${topicPartition}, current status $status")
// skip those partitions that have already been satisfied
if (status.acksPending) {
val (hasEnough, error) = replicaManager.getPartition(topicPartition) match {
case Some(partition) =>
partition.checkEnoughReplicasReachOffset(status.requiredOffset)
case None =>
// Case A
(false, Errors.UNKNOWN_TOPIC_OR_PARTITION)
}
// Case B.1 || B.2
if (error != Errors.NONE || hasEnough) {
status.acksPending = false
status.responseStatus.error = error
}
}
}
// check if every partition has satisfied at least one of case A or B
if (!produceMetadata.produceStatus.values.exists(_.acksPending))
forceComplete()
else
false
}
override def onExpiration() {
produceMetadata.produceStatus.foreach { case (topicPartition, status) =>
if (status.acksPending) {
DelayedProduceMetrics.recordExpiration(topicPartition)
}
}
}
/**
* Upon completion, return the current response status along with the error code per partition
*/
override def onComplete() {
val responseStatus = produceMetadata.produceStatus.mapValues(status => status.responseStatus)
responseCallback(responseStatus)
}
}
object DelayedProduceMetrics extends KafkaMetricsGroup {
private val aggregateExpirationMeter = newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS)
private val partitionExpirationMeterFactory = (key: TopicPartition) =>
newMeter("ExpiresPerSec",
"requests",
TimeUnit.SECONDS,
tags = Map("topic" -> key.topic, "partition" -> key.partition.toString))
private val partitionExpirationMeters = new Pool[TopicPartition, Meter](valueFactory = Some(partitionExpirationMeterFactory))
def recordExpiration(partition: TopicPartition) {
aggregateExpirationMeter.mark()
partitionExpirationMeters.getAndMaybePut(partition).mark()
}
}