blob: 901293a61790ccfb52cd8612c3dc261f0d681eca [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.openwhisk.core.loadBalancer
import akka.actor.ActorRef
import java.nio.charset.StandardCharsets
import java.util.concurrent.atomic.LongAdder
import akka.actor.ActorSystem
import akka.event.Logging.InfoLevel
import org.apache.kafka.clients.producer.RecordMetadata
import pureconfig._
import pureconfig.generic.auto._
import org.apache.openwhisk.common.LoggingMarkers._
import org.apache.openwhisk.common._
import org.apache.openwhisk.core.connector._
import org.apache.openwhisk.core.controller.Controller
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
import scala.collection.concurrent.TrieMap
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Failure, Success}
/**
* Abstract class which provides common logic for all LoadBalancer implementations.
*/
abstract class CommonLoadBalancer(config: WhiskConfig,
feedFactory: FeedFactory,
controllerInstance: ControllerInstanceId)(implicit
val actorSystem: ActorSystem,
logging: Logging,
messagingProvider: MessagingProvider)
extends LoadBalancer {
protected implicit val executionContext: ExecutionContext = actorSystem.dispatcher
val lbConfig: ShardingContainerPoolBalancerConfig =
loadConfigOrThrow[ShardingContainerPoolBalancerConfig](ConfigKeys.loadbalancer)
protected val invokerPool: ActorRef
/** State related to invocations and throttling */
protected[loadBalancer] val activationSlots = TrieMap[ActivationId, ActivationEntry]()
protected[loadBalancer] val activationPromises =
TrieMap[ActivationId, Promise[Either[ActivationId, WhiskActivation]]]()
protected val activationsPerNamespace = TrieMap[UUID, LongAdder]()
protected val totalActivations = new LongAdder()
protected val totalBlackBoxActivationMemory = new LongAdder()
protected val totalManagedActivationMemory = new LongAdder()
protected def emitMetrics() = {
MetricEmitter.emitGaugeMetric(LOADBALANCER_ACTIVATIONS_INFLIGHT(controllerInstance), totalActivations.longValue)
MetricEmitter.emitGaugeMetric(
LOADBALANCER_MEMORY_INFLIGHT(controllerInstance, ""),
totalBlackBoxActivationMemory.longValue + totalManagedActivationMemory.longValue)
MetricEmitter.emitGaugeMetric(
LOADBALANCER_MEMORY_INFLIGHT(controllerInstance, "Blackbox"),
totalBlackBoxActivationMemory.longValue)
MetricEmitter.emitGaugeMetric(
LOADBALANCER_MEMORY_INFLIGHT(controllerInstance, "Managed"),
totalManagedActivationMemory.longValue)
}
actorSystem.scheduler.scheduleAtFixedRate(10.seconds, 10.seconds)(() => emitMetrics())
override def activeActivationsFor(namespace: UUID): Future[Int] =
Future.successful(activationsPerNamespace.get(namespace).map(_.intValue).getOrElse(0))
override def totalActiveActivations: Future[Int] = Future.successful(totalActivations.intValue)
/**
* Calculate the duration within which a completion ack must be received for an activation.
*
* Calculation is based on the passed action time limit. If the passed action time limit is shorter than
* the configured standard action time limit, the latter is used to avoid too tight timeouts.
*
* The base timeout is multiplied with a configurable timeout factor. This dilution controls how much slack you
* want to allow in your system before you start reporting failed activations. The default value of 2 bases
* on invoker behavior that a cold invocation's init duration may be as long as its run duration. Higher factors
* may account for additional wait times.
*
* Finally, a configurable duration is added to the diluted timeout to be lenient towards general delays / wait times.
*
* @param actionTimeLimit the action's time limit
* @return the calculated time duration within which a completion ack must be received
*/
private def calculateCompletionAckTimeout(actionTimeLimit: FiniteDuration): FiniteDuration = {
(actionTimeLimit.max(TimeLimit.STD_DURATION) * lbConfig.timeoutFactor) + lbConfig.timeoutAddon
}
/**
* 2. Update local state with the activation to be executed scheduled.
*
* All activations are tracked in the activationSlots map. Additionally, blocking invokes
* are tracked in the activationPromises map. When a result is received via result ack, it
* will cause the result to be forwarded to the caller waiting on the result, and cancel
* the DB poll which is also trying to do the same.
* Once the completion ack arrives, activationSlots entry will be removed.
*/
protected def setupActivation(msg: ActivationMessage,
action: ExecutableWhiskActionMetaData,
instance: InvokerInstanceId): Future[Either[ActivationId, WhiskActivation]] = {
// Needed for emitting metrics.
totalActivations.increment()
val isBlackboxInvocation = action.exec.pull
val totalActivationMemory =
if (isBlackboxInvocation) totalBlackBoxActivationMemory else totalManagedActivationMemory
totalActivationMemory.add(action.limits.memory.megabytes)
activationsPerNamespace.getOrElseUpdate(msg.user.namespace.uuid, new LongAdder()).increment()
// Completion Ack must be received within the calculated time.
val completionAckTimeout = calculateCompletionAckTimeout(action.limits.timeout.duration)
// If activation is blocking, store a promise that we can mark successful later on once the result ack
// arrives. Return a Future representing the promise to caller.
// If activation is non-blocking, return a successfully completed Future to caller.
val resultPromise = if (msg.blocking) {
activationPromises.getOrElseUpdate(msg.activationId, Promise[Either[ActivationId, WhiskActivation]]()).future
} else Future.successful(Left(msg.activationId))
// Install a timeout handler for the catastrophic case where a completion ack is not received at all
// (because say an invoker is down completely, or the connection to the message bus is disrupted) or when
// the completion ack is significantly delayed (possibly dues to long queues but the subject should not be penalized);
// in this case, if the activation handler is still registered, remove it and update the books.
//
// Attention: a significantly delayed completion ack means that the invoker is still busy or will be busy in future
// with running the action. So the current strategy of freeing up the activation's memory in invoker
// book-keeping will allow the load balancer to send more activations to the invoker. This can lead to
// invoker overloads so that activations need to wait until other activations complete.
activationSlots.getOrElseUpdate(
msg.activationId, {
val timeoutHandler = actorSystem.scheduler.scheduleOnce(completionAckTimeout) {
processCompletion(msg.activationId, msg.transid, forced = true, isSystemError = false, instance = instance)
}
// please note: timeoutHandler.cancel must be called on all non-timeout paths, e.g. Success
ActivationEntry(
msg.activationId,
msg.user.namespace.uuid,
instance,
action.limits.memory.megabytes.MB,
action.limits.timeout.duration,
action.limits.concurrency.maxConcurrent,
action.fullyQualifiedName(true),
timeoutHandler,
isBlackboxInvocation,
msg.blocking)
})
resultPromise
}
protected val messageProducer =
messagingProvider.getProducer(config, Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))
/** 3. Send the activation to the invoker */
protected def sendActivationToInvoker(producer: MessageProducer,
msg: ActivationMessage,
invoker: InvokerInstanceId): Future[RecordMetadata] = {
implicit val transid: TransactionId = msg.transid
val topic = s"${Controller.topicPrefix}invoker${invoker.toInt}"
MetricEmitter.emitCounterMetric(LoggingMarkers.LOADBALANCER_ACTIVATION_START)
val start = transid.started(
this,
LoggingMarkers.CONTROLLER_KAFKA,
s"posting topic '$topic' with activation id '${msg.activationId}'",
logLevel = InfoLevel)
producer.send(topic, msg).andThen {
case Success(status) =>
transid.finished(
this,
start,
s"posted to ${status.topic()}[${status.partition()}][${status.offset()}]",
logLevel = InfoLevel)
case Failure(_) => transid.failed(this, start, s"error on posting to topic $topic")
}
}
/** Subscribes to ack messages from the invokers (result / completion) and registers a handler for these messages. */
private val activationFeed: ActorRef =
feedFactory.createFeed(actorSystem, messagingProvider, processAcknowledgement)
/** 4. Get the ack message and parse it */
protected[loadBalancer] def processAcknowledgement(bytes: Array[Byte]): Future[Unit] = Future {
val raw = new String(bytes, StandardCharsets.UTF_8)
AcknowledegmentMessage.parse(raw) match {
case Success(acknowledegment) =>
acknowledegment.isSlotFree.foreach { instance =>
processCompletion(
acknowledegment.activationId,
acknowledegment.transid,
forced = false,
isSystemError = acknowledegment.isSystemError.getOrElse(false),
instance)
}
acknowledegment.result.foreach { response =>
processResult(acknowledegment.activationId, acknowledegment.transid, response)
}
activationFeed ! MessageFeed.Processed
case Failure(t) =>
activationFeed ! MessageFeed.Processed
logging.error(this, s"failed processing message: $raw")
case _ =>
activationFeed ! MessageFeed.Processed
logging.error(this, s"Unexpected Acknowledgment message received by loadbalancer: $raw")
}
}
/** 5. Process the result ack and return it to the user */
protected def processResult(aid: ActivationId,
tid: TransactionId,
response: Either[ActivationId, WhiskActivation]): Unit = {
// Resolve the promise to send the result back to the user.
// The activation will be removed from the activation slots later, when the completion message
// is received (because the slot in the invoker is not yet free for new activations).
activationPromises.remove(aid).foreach(_.trySuccess(response))
logging.info(this, s"received result ack for '$aid'")(tid)
}
protected def releaseInvoker(invoker: InvokerInstanceId, entry: ActivationEntry): Unit
// Singletons for counter metrics related to completion acks
protected val LOADBALANCER_COMPLETION_ACK_REGULAR =
LoggingMarkers.LOADBALANCER_COMPLETION_ACK(controllerInstance, RegularCompletionAck)
protected val LOADBALANCER_COMPLETION_ACK_FORCED =
LoggingMarkers.LOADBALANCER_COMPLETION_ACK(controllerInstance, ForcedCompletionAck)
protected val LOADBALANCER_COMPLETION_ACK_HEALTHCHECK =
LoggingMarkers.LOADBALANCER_COMPLETION_ACK(controllerInstance, HealthcheckCompletionAck)
protected val LOADBALANCER_COMPLETION_ACK_REGULAR_AFTER_FORCED =
LoggingMarkers.LOADBALANCER_COMPLETION_ACK(controllerInstance, RegularAfterForcedCompletionAck)
protected val LOADBALANCER_COMPLETION_ACK_FORCED_AFTER_REGULAR =
LoggingMarkers.LOADBALANCER_COMPLETION_ACK(controllerInstance, ForcedAfterRegularCompletionAck)
/** 6. Process the completion ack and update the state */
protected[loadBalancer] def processCompletion(aid: ActivationId,
tid: TransactionId,
forced: Boolean,
isSystemError: Boolean,
instance: InstanceId): Unit = {
val invoker = instance match {
case i: InvokerInstanceId => Some(i)
case _ => None
}
val invocationResult = if (forced) {
InvocationFinishedResult.Timeout
} else {
// If the response contains a system error, report that, otherwise report Success
// Left generally is considered a Success, since that could be a message not fitting into Kafka
if (isSystemError) {
InvocationFinishedResult.SystemError
} else {
InvocationFinishedResult.Success
}
}
activationSlots.remove(aid) match {
case Some(entry) =>
totalActivations.decrement()
val totalActivationMemory =
if (entry.isBlackbox) totalBlackBoxActivationMemory else totalManagedActivationMemory
totalActivationMemory.add(entry.memoryLimit.toMB * (-1))
activationsPerNamespace.get(entry.namespaceId).foreach(_.decrement())
invoker.foreach(releaseInvoker(_, entry))
if (!forced) {
entry.timeoutHandler.cancel()
// notice here that the activationPromises is not touched, because the expectation is that
// the active ack is received as expected, and processing that message removed the promise
// from the corresponding map
logging.info(this, s"received completion ack for '$aid', system error=$isSystemError")(tid)
MetricEmitter.emitCounterMetric(LOADBALANCER_COMPLETION_ACK_REGULAR)
} else {
// the entry has timed out; if the active ack is still around, remove its entry also
// and complete the promise with a failure if necessary
activationPromises
.remove(aid)
.foreach(_.tryFailure(new Throwable("no completion or active ack received yet")))
val actionType = if (entry.isBlackbox) "blackbox" else "managed"
val blockingType = if (entry.isBlocking) "blocking" else "non-blocking"
val completionAckTimeout = calculateCompletionAckTimeout(entry.timeLimit)
logging.warn(
this,
s"forced completion ack for '$aid', action '${entry.fullyQualifiedEntityName}' ($actionType), $blockingType, mem limit ${entry.memoryLimit.toMB} MB, time limit ${entry.timeLimit.toMillis} ms, completion ack timeout $completionAckTimeout from $instance")(
tid)
MetricEmitter.emitCounterMetric(LOADBALANCER_COMPLETION_ACK_FORCED)
}
// Completion acks that are received here are strictly from user actions - health actions are not part of
// the load balancer's activation map. Inform the invoker pool supervisor of the user action completion.
// guard this
invoker.foreach(invokerPool ! InvocationFinishedMessage(_, invocationResult))
case None if tid == TransactionId.invokerHealth =>
// Health actions do not have an ActivationEntry as they are written on the message bus directly. Their result
// is important to pass to the invokerPool because they are used to determine if the invoker can be considered
// healthy again.
logging.info(this, s"received completion ack for health action on $instance")(tid)
MetricEmitter.emitCounterMetric(LOADBALANCER_COMPLETION_ACK_HEALTHCHECK)
// guard this
invoker.foreach(invokerPool ! InvocationFinishedMessage(_, invocationResult))
case None if !forced =>
// Received a completion ack that has already been taken out of the state because of a timeout (forced ack).
// The result is ignored because a timeout has already been reported to the invokerPool per the force.
// Logging this condition as a warning because the invoker processed the activation and sent a completion
// message - but not in time.
logging.warn(
this,
s"received completion ack for '$aid' from $instance which has no entry, system error=$isSystemError")(tid)
MetricEmitter.emitCounterMetric(LOADBALANCER_COMPLETION_ACK_REGULAR_AFTER_FORCED)
case None =>
// The entry has already been removed by a completion ack. This part of the code is reached by the timeout and can
// happen if completion ack and timeout happen roughly at the same time (the timeout was triggered before the completion
// ack canceled the timer). As the completion ack is already processed we don't have to do anything here.
logging.debug(this, s"forced completion ack for '$aid' which has no entry")(tid)
MetricEmitter.emitCounterMetric(LOADBALANCER_COMPLETION_ACK_FORCED_AFTER_REGULAR)
}
}
}