blob: 0cf4191307917f83f4f640c9a1dc8384a1e5b9dc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.streaming.kafka.lib.source.consumer
import java.nio.channels.ClosedByInterruptException
import java.util.concurrent.LinkedBlockingQueue
import kafka.common.TopicAndPartition
import org.apache.gearpump.streaming.kafka.lib.util.KafkaClient
import org.apache.gearpump.streaming.kafka.util.KafkaConfig
import org.slf4j.Logger
import org.apache.gearpump.util.LogUtil
object FetchThread {
private val LOG: Logger = LogUtil.getLogger(classOf[FetchThread])
val factory = new FetchThreadFactory
class FetchThreadFactory extends java.io.Serializable {
def getFetchThread(config: KafkaConfig, client: KafkaClient): FetchThread = {
val fetchThreshold = config.getInt(KafkaConfig.FETCH_THRESHOLD_CONFIG)
val fetchSleepMS = config.getLong(KafkaConfig.FETCH_SLEEP_MS_CONFIG)
val startOffsetTime = config.getLong(KafkaConfig.CONSUMER_START_OFFSET_CONFIG)
FetchThread(fetchThreshold, fetchSleepMS, startOffsetTime, client)
}
}
def apply(fetchThreshold: Int,
fetchSleepMS: Long,
startOffsetTime: Long,
client: KafkaClient): FetchThread = {
val createConsumer = (tp: TopicAndPartition) =>
client.createConsumer(tp.topic, tp.partition, startOffsetTime)
val incomingQueue = new LinkedBlockingQueue[KafkaMessage]()
val sleeper = new ExponentialBackoffSleeper(
backOffMultiplier = 2.0,
initialDurationMs = 100L,
maximumDurationMs = 10000L)
new FetchThread(createConsumer, incomingQueue, sleeper, fetchThreshold, fetchSleepMS)
}
}
/**
* A thread to fetch messages from multiple kafka org.apache.kafka.TopicAndPartition and puts them
* onto a queue, which is asynchronously polled by a consumer
*
* @param createConsumer given a org.apache.kafka.TopicAndPartition, create a
* [[KafkaConsumer]] to
* connect to it
* @param incomingQueue a queue to buffer incoming messages
* @param fetchThreshold above which thread should stop fetching messages
* @param fetchSleepMS interval to sleep when no more messages or hitting fetchThreshold
*/
private[kafka] class FetchThread(
createConsumer: TopicAndPartition => KafkaConsumer,
incomingQueue: LinkedBlockingQueue[KafkaMessage],
sleeper: ExponentialBackoffSleeper,
fetchThreshold: Int,
fetchSleepMS: Long) extends Thread {
import org.apache.gearpump.streaming.kafka.lib.source.consumer.FetchThread._
private var consumers: Map[TopicAndPartition, KafkaConsumer] =
Map.empty[TopicAndPartition, KafkaConsumer]
private var topicAndPartitions: Array[TopicAndPartition] =
Array.empty[TopicAndPartition]
private var nextOffsets = Map.empty[TopicAndPartition, Long]
private var reset = false
def setTopicAndPartitions(topicAndPartitions: Array[TopicAndPartition]): Unit = {
this.topicAndPartitions = topicAndPartitions
consumers = createAllConsumers
}
def setStartOffset(tp: TopicAndPartition, startOffset: Long): Unit = {
consumers.get(tp).foreach(_.setStartOffset(startOffset))
}
def poll: Option[KafkaMessage] = {
Option(incomingQueue.poll())
}
override def run(): Unit = {
try {
while (!Thread.currentThread().isInterrupted) {
runLoop()
}
} catch {
case e: InterruptedException => LOG.info("fetch thread got interrupted exception")
case e: ClosedByInterruptException => LOG.info("fetch thread closed by interrupt exception")
} finally {
consumers.values.foreach(_.close())
}
}
private[lib] def runLoop(): Unit = {
try {
if (reset) {
nextOffsets = consumers.mapValues(_.getNextOffset)
resetConsumers(nextOffsets)
reset = false
}
val fetchMore: Boolean = fetchMessage
sleeper.reset()
if (!fetchMore) {
// sleep for given duration
sleeper.sleep(fetchSleepMS)
}
} catch {
case exception: Exception =>
LOG.warn(s"resetting consumers due to $exception")
reset = true
// sleep for exponentially increasing duration
sleeper.sleep()
}
}
/**
* fetch message from each TopicAndPartition in a round-robin way
*
* @return whether to fetch more messages
*/
private def fetchMessage: Boolean = {
if (incomingQueue.size >= fetchThreshold) {
false
} else {
consumers.foldLeft(false) { (hasNext, tpAndConsumer) =>
val (_, consumer) = tpAndConsumer
if (consumer.hasNext) {
incomingQueue.put(consumer.next())
true
} else {
hasNext
}
}
}
}
private def createAllConsumers: Map[TopicAndPartition, KafkaConsumer] = {
topicAndPartitions.map(tp => tp -> createConsumer(tp)).toMap
}
private def resetConsumers(nextOffsets: Map[TopicAndPartition, Long]): Unit = {
consumers.values.foreach(_.close())
consumers = createAllConsumers
consumers.foreach { case (tp, consumer) =>
consumer.setStartOffset(nextOffsets(tp))
}
}
}