* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package whisk.connector.kafka
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.{RetriableException, WakeupException}
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import pureconfig.loadConfigOrThrow
import whisk.common.{Logging, LoggingMarkers, MetricEmitter, Scheduler}
import whisk.connector.kafka.KafkaConfiguration._
import whisk.core.ConfigKeys
import whisk.core.connector.MessageConsumer
import scala.collection.JavaConverters._
import scala.concurrent.duration._
import scala.concurrent.{blocking, ExecutionContext, Future}
case class KafkaConsumerConfig(sessionTimeoutMs: Long, metricFlushIntervalS: Int)
class KafkaConsumerConnector(
kafkahost: String,
groupid: String,
topic: String,
override val maxPeek: Int = Int.MaxValue)(implicit logging: Logging, actorSystem: ActorSystem)
extends MessageConsumer {
implicit val ec: ExecutionContext = actorSystem.dispatcher
private val gracefulWaitTime = 100.milliseconds
// The consumer is generally configured via getProps. This configuration only loads values necessary for "outer"
// logic, like the wakeup timer.
private val cfg = loadConfigOrThrow[KafkaConsumerConfig](ConfigKeys.kafkaConsumer)
// Currently consumed offset, is used to calculate the topic lag.
// It is updated from one thread in "peek", no concurrent data structure is necessary
private var offset: Long = 0
// Markers for metrics, initialized only once
private val queueMetric = LoggingMarkers.KAFKA_QUEUE(topic)
private val delayMetric = LoggingMarkers.KAFKA_MESSAGE_DELAY(topic)
* Long poll for messages. Method returns once message are available but no later than given
* duration.
* @param duration the maximum duration for the long poll
override def peek(duration: FiniteDuration = 500.milliseconds,
retry: Int = 3): Iterable[(String, Int, Long, Array[Byte])] = {
// poll can be infinitely blocked in edge-cases, so we need to wakeup explicitly.
val wakeUpTask = actorSystem.scheduler.scheduleOnce(cfg.sessionTimeoutMs.milliseconds + 1.second)(consumer.wakeup())
try {
val response = consumer.poll(duration.toMillis).asScala
val now = System.currentTimeMillis
response.lastOption.foreach(record => offset = record.offset + 1) { r =>
// record the time between producing the message and reading it
MetricEmitter.emitHistogramMetric(delayMetric, (now - r.timestamp).max(0))
(r.topic, r.partition, r.offset, r.value)
} catch {
// Happens if the peek hangs.
case _: WakeupException if retry > 0 =>
logging.error(this, s"poll timeout occurred. Retrying $retry more times.")
Thread.sleep(gracefulWaitTime.toMillis) // using Thread.sleep is okay, since `poll` is blocking anyway
peek(duration, retry - 1)
case e: RetriableException if retry > 0 =>
logging.error(this, s"$e: Retrying $retry more times")
Thread.sleep(gracefulWaitTime.toMillis) // using Thread.sleep is okay, since `poll` is blocking anyway
peek(duration, retry - 1)
// Every other error results in a restart of the consumer
case e: Throwable =>
throw e
} finally wakeUpTask.cancel()
* Commits offsets from last poll.
def commit(retry: Int = 3): Unit =
try {
} catch {
case e: RetriableException =>
if (retry > 0) {
logging.error(this, s"$e: retrying $retry more times")
Thread.sleep(gracefulWaitTime.toMillis) // using Thread.sleep is okay, since `commitSync` is blocking anyway
commit(retry - 1)
} else {
throw e
override def close(): Unit = {
consumer.close(), s"closing '$topic' consumer")
/** Creates a new kafka consumer and subscribes to topic list if given. */
private def createConsumer(topic: String) = {
val config = Map(
ConsumerConfig.GROUP_ID_CONFIG -> groupid,
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> kafkahost,
ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> maxPeek.toString) ++
configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaCommon)) ++
configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaConsumer))
val consumer = new KafkaConsumer(config, new ByteArrayDeserializer, new ByteArrayDeserializer)
private def recreateConsumer(): Unit = {
val oldConsumer = consumer
oldConsumer.close(), s"old consumer closed")
consumer = createConsumer(topic)
@volatile private var consumer: KafkaConsumer[Array[Byte], Array[Byte]] = createConsumer(topic)
// Read current lag of the consumed topic, e.g. invoker queue
// Since we use only one partition in kafka, it is defined 0
Scheduler.scheduleWaitAtMost(cfg.metricFlushIntervalS.seconds, 10.seconds, "kafka-lag-monitor") { () =>
Future {
blocking {
if (offset > 0) {
val topicAndPartition = new TopicPartition(topic, 0)
consumer.endOffsets(Set(topicAndPartition).asJava).asScala.get(topicAndPartition).foreach { endOffset =>
// endOffset could lag behind the offset reported by the consumer internally resulting in negative numbers
val queueSize = (endOffset - offset).max(0)
MetricEmitter.emitHistogramMetric(queueMetric, queueSize)