blob: 8713b4afde57e483f5ac89aec935eb9a94fb313a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.openwhisk.core.connector
import scala.annotation.tailrec
import scala.collection.immutable
import scala.concurrent.Future
import scala.concurrent.blocking
import scala.concurrent.duration._
import scala.util.Failure
import org.apache.kafka.clients.consumer.CommitFailedException
import akka.actor.FSM
import akka.pattern.pipe
import org.apache.openwhisk.common.Logging
import org.apache.openwhisk.common.TransactionId
trait MessageConsumer {
/** The maximum number of messages peeked (i.e., max number of messages retrieved during a long poll). */
val maxPeek: Int
/**
* Gets messages via a long poll. May or may not remove messages
* from the message connector. Use commit() to ensure messages are
* removed from the connector.
*
* @param duration for the long poll
* @return iterable collection (topic, partition, offset, bytes)
*/
def peek(duration: FiniteDuration, retry: Int = 3): Iterable[(String, Int, Long, Array[Byte])]
/**
* Commits offsets from last peek operation to ensure they are removed
* from the connector.
*/
def commit(retry: Int = 3): Unit
/** Closes consumer. */
def close(): Unit
}
object MessageFeed {
protected sealed trait FeedState
protected[connector] case object Idle extends FeedState
protected[connector] case object FillingPipeline extends FeedState
protected[connector] case object DrainingPipeline extends FeedState
protected sealed trait FeedData
private case object NoData extends FeedData
/** Indicates the consumer is ready to accept messages from the message bus for processing. */
object Ready
/** Steady state message, indicates capacity in downstream process to receive more messages. */
object Processed
/** Indicates the fill operation has completed. */
private case class FillCompleted(messages: Seq[(String, Int, Long, Array[Byte])])
}
/**
* This actor polls the message bus for new messages and dispatches them to the given
* handler. The actor tracks the number of messages dispatched and will not dispatch new
* messages until some number of them are acknowledged.
*
* This is used by the invoker to pull messages from the message bus and apply back pressure
* when the invoker does not have resources to complete processing messages (i.e., no containers
* are available to run new actions). It is also used in the load balancer to consume active
* ack messages.
* When the invoker releases resources (by reclaiming containers) it will send a message
* to this actor which will then attempt to fill the pipeline with new messages.
*
* The actor tries to fill the pipeline with additional messages while the number
* of outstanding requests is below the pipeline fill threshold.
*/
@throws[IllegalArgumentException]
class MessageFeed(description: String,
logging: Logging,
consumer: MessageConsumer,
maximumHandlerCapacity: Int,
longPollDuration: FiniteDuration,
handler: Array[Byte] => Future[Unit],
autoStart: Boolean = true,
logHandoff: Boolean = true)
extends FSM[MessageFeed.FeedState, MessageFeed.FeedData] {
import MessageFeed._
// double-buffer to make up for message bus read overhead
val maxPipelineDepth = maximumHandlerCapacity * 2
private val pipelineFillThreshold = maxPipelineDepth - consumer.maxPeek
require(
consumer.maxPeek <= maxPipelineDepth,
"consumer may not yield more messages per peek than permitted by max depth")
// Immutable Queue
// although on the surface it seems to make sense to use an immutable variable with a mutable Queue,
// Akka Actor state defies the usual "prefer immutable" guideline in Scala, esp. w/ Collections.
// If, for some reason, this Queue was mutable and is accidentally leaked in say an Akka message,
// another Actor or recipient would be able to mutate the internal state of this Actor.
// Best practice dictates a mutable variable pointing at an immutable collection for this reason
private var outstandingMessages = immutable.Queue.empty[(String, Int, Long, Array[Byte])]
private var handlerCapacity = maximumHandlerCapacity
private implicit val tid = TransactionId.dispatcher
logging.info(
this,
s"handler capacity = $maximumHandlerCapacity, pipeline fill at = $pipelineFillThreshold, pipeline depth = $maxPipelineDepth")
when(Idle) {
case Event(Ready, _) =>
fillPipeline()
goto(FillingPipeline)
case _ => stay
}
// wait for fill to complete, and keep filling if there is
// capacity otherwise wait to drain
when(FillingPipeline) {
case Event(Processed, _) =>
updateHandlerCapacity()
sendOutstandingMessages()
stay
case Event(FillCompleted(messages), _) =>
outstandingMessages = outstandingMessages ++ messages
sendOutstandingMessages()
if (shouldFillQueue()) {
fillPipeline()
stay
} else {
goto(DrainingPipeline)
}
case _ => stay
}
when(DrainingPipeline) {
case Event(Processed, _) =>
updateHandlerCapacity()
sendOutstandingMessages()
if (shouldFillQueue()) {
fillPipeline()
goto(FillingPipeline)
} else stay
case _ => stay
}
onTransition { case _ -> Idle => if (autoStart) self ! Ready }
startWith(Idle, MessageFeed.NoData)
initialize()
private implicit val ec = context.system.dispatchers.lookup("dispatchers.kafka-dispatcher")
private def fillPipeline(): Unit = {
if (outstandingMessages.size <= pipelineFillThreshold) {
Future {
blocking {
// Grab next batch of messages and commit offsets immediately
// essentially marking the activation as having satisfied "at most once"
// semantics (this is the point at which the activation is considered started).
// If the commit fails, then messages peeked are peeked again on the next poll.
// While the commit is synchronous and will block until it completes, at steady
// state with enough buffering (i.e., maxPipelineDepth > maxPeek), the latency
// of the commit should be masked.
val records = consumer.peek(longPollDuration)
consumer.commit()
FillCompleted(records.toSeq)
}
}.andThen {
case Failure(e: CommitFailedException) =>
logging.error(this, s"failed to commit $description consumer offset: $e")
case Failure(e: Throwable) => logging.error(this, s"exception while pulling new $description records: $e")
}
.recover {
case _ => FillCompleted(Seq.empty)
}
.pipeTo(self)
} else {
logging.error(this, s"dropping fill request until $description feed is drained")
}
}
/** Send as many messages as possible to the handler. */
@tailrec
private def sendOutstandingMessages(): Unit = {
val occupancy = outstandingMessages.size
if (occupancy > 0 && handlerCapacity > 0) {
// Easiest way with an immutable queue to cleanly dequeue
// Head is the first elemeent of the queue, desugared w/ an assignment pattern
// Tail is everything but the first element, thus mutating the collection variable
val (topic, partition, offset, bytes) = outstandingMessages.head
outstandingMessages = outstandingMessages.tail
if (logHandoff) logging.debug(this, s"processing $topic[$partition][$offset] ($occupancy/$handlerCapacity)")
handler(bytes)
handlerCapacity -= 1
sendOutstandingMessages()
}
}
private def shouldFillQueue(): Boolean = {
val occupancy = outstandingMessages.size
if (occupancy <= pipelineFillThreshold) {
logging.debug(
this,
s"$description pipeline has capacity: $occupancy <= $pipelineFillThreshold ($handlerCapacity)")
true
} else {
logging.debug(this, s"$description pipeline must drain: $occupancy > $pipelineFillThreshold")
false
}
}
private def updateHandlerCapacity(): Int = {
logging.debug(self, s"$description received processed msg, current capacity = $handlerCapacity")
if (handlerCapacity < maximumHandlerCapacity) {
handlerCapacity += 1
handlerCapacity
} else {
if (handlerCapacity > maximumHandlerCapacity) logging.error(self, s"$description capacity already at max")
maximumHandlerCapacity
}
}
}