package org.apache.openwhisk.core.loadBalancer
import java.util.concurrent.ThreadLocalRandom
import{Actor, ActorSystem, Cancellable, Props}
import akka.cluster.ClusterEvent._
import akka.cluster.{Cluster, Member, MemberStatus}
import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.openwhisk.common.InvokerState.{Healthy, Offline, Unhealthy, Unresponsive}
import pureconfig._
import org.apache.openwhisk.common._
import org.apache.openwhisk.core.WhiskConfig._
import org.apache.openwhisk.core.connector._
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.size.SizeLong
import org.apache.openwhisk.common.LoggingMarkers._
import org.apache.openwhisk.core.controller.Controller
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
import org.apache.openwhisk.spi.SpiLoader
import scala.annotation.tailrec
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
* A loadbalancer that schedules workload based on a hashing-algorithm.
* ## Algorithm
* At first, for every namespace + action pair a hash is calculated and then an invoker is picked based on that hash
* (`hash % numInvokers`). The determined index is the so called "home-invoker". This is the invoker where the following
* progression will **always** start. If this invoker is healthy (see "Invoker health checking") and if there is
* capacity on that invoker (see "Capacity checking"), the request is scheduled to it.
* If one of these prerequisites is not true, the index is incremented by a step-size. The step-sizes available are the
* all coprime numbers smaller than the amount of invokers available (coprime, to minimize collisions while progressing
* through the invokers). The step-size is picked by the same hash calculated above (`hash & numStepSizes`). The
* home-invoker-index is now incremented by the step-size and the checks (healthy + capacity) are done on the invoker
* we land on now.
* This procedure is repeated until all invokers have been checked at which point the "overload" strategy will be
* employed, which is to choose a healthy invoker randomly. In a steadily running system, that overload means that there
* is no capacity on any invoker left to schedule the current request to.
* If no invokers are available or if there are no healthy invokers in the system, the loadbalancer will return an error
* stating that no invokers are available to take any work. Requests are not queued anywhere in this case.
* An example:
* - availableInvokers: 10 (all healthy)
* - hash: 13
* - homeInvoker: hash % availableInvokers = 13 % 10 = 3
* - stepSizes: 1, 3, 7 (note how 2 and 5 is not part of this because it's not coprime to 10)
* - stepSizeIndex: hash % numStepSizes = 13 % 3 = 1 => stepSize = 3
* Progression to check the invokers: 3, 6, 9, 2, 5, 8, 1, 4, 7, 0 --> done
* This heuristic is based on the assumption, that the chance to get a warm container is the best on the home invoker
* and degrades the more steps you make. The hashing makes sure that all loadbalancers in a cluster will always pick the
* same home invoker and do the same progression for a given action.
* Known caveats:
* - This assumption is not always true. For instance, two heavy workloads landing on the same invoker can override each
* other, which results in many cold starts due to all containers being evicted by the invoker to make space for the
* "other" workload respectively. Future work could be to keep a buffer of invokers last scheduled for each action and
* to prefer to pick that one. Then the second-last one and so forth.
* ## Capacity checking
* The maximum capacity per invoker is configured using `user-memory`, which is the maximum amount of memory of actions
* running in parallel on that invoker.
* Spare capacity is determined by what the loadbalancer thinks it scheduled to each invoker. Upon scheduling, an entry
* is made to update the books and a slot for each MB of the actions memory limit in a Semaphore is taken. These slots
* are only released after the response from the invoker (active-ack) arrives **or** after the active-ack times out.
* The Semaphore has as many slots as MBs are configured in `user-memory`.
* Known caveats:
* - In an overload scenario, activations are queued directly to the invokers, which makes the active-ack timeout
* unpredictable. Timing out active-acks in that case can cause the loadbalancer to prematurely assign new load to an
* overloaded invoker, which can cause uneven queues.
* - The same is true if an invoker is extraordinarily slow in processing activations. The queue on this invoker will
* slowly rise if it gets slow to the point of still sending pings, but handling the load so slowly, that the
* active-acks time out. The loadbalancer again will think there is capacity, when there is none.
* Both caveats could be solved in future work by not queueing to invoker topics on overload, but to queue on a
* centralized overflow topic. Timing out an active-ack can then be seen as a system-error, as described in the
* following.
* ## Invoker health checking
* Invoker health is determined via a kafka-based protocol, where each invoker pings the loadbalancer every second. If
* no ping is seen for a defined amount of time, the invoker is considered "Offline".
* Moreover, results from all activations are inspected. If more than 3 out of the last 10 activations contained system
* errors, the invoker is considered "Unhealthy". If an invoker is unhealthy, no user workload is sent to it, but
* test-actions are sent by the loadbalancer to check if system errors are still happening. If the
* system-error-threshold-count in the last 10 activations falls below 3, the invoker is considered "Healthy" again.
* To summarize:
* - "Offline": Ping missing for > 10 seconds
* - "Unhealthy": > 3 **system-errors** in the last 10 activations, pings arriving as usual
* - "Healthy": < 3 **system-errors** in the last 10 activations, pings arriving as usual
* ## Horizontal sharding
* Sharding is employed to avoid both loadbalancers having to share any data, because the metrics used in scheduling
* are very fast changing.
* Horizontal sharding means, that each invoker's capacity is evenly divided between the loadbalancers. If an invoker
* has at most 16 slots available (invoker-busy-threshold = 16), those will be divided to 8 slots for each loadbalancer
* (if there are 2).
* If concurrent activation processing is enabled (and concurrency limit is > 1), accounting of containers and
* concurrency capacity per container will limit the number of concurrent activations routed to the particular
* slot at an invoker. Default max concurrency is 1.
* Known caveats:
* - If a loadbalancer leaves or joins the cluster, all state is removed and created from scratch. Those events should
* not happen often.
* - If concurrent activation processing is enabled, it only accounts for the containers that the current loadbalancer knows.
* So the actual number of containers launched at the invoker may be less than is counted at the loadbalancer, since
* the invoker may skip container launch in case there is concurrent capacity available for a container launched via
* some other loadbalancer.
class ShardingContainerPoolBalancer(
config: WhiskConfig,
controllerInstance: ControllerInstanceId,
feedFactory: FeedFactory,
val invokerPoolFactory: InvokerPoolFactory,
implicit val messagingProvider: MessagingProvider = SpiLoader.get[MessagingProvider])(
implicit actorSystem: ActorSystem,
logging: Logging,
materializer: ActorMaterializer)
extends CommonLoadBalancer(config, feedFactory, controllerInstance) {
/** Build a cluster of all loadbalancers */
private val cluster: Option[Cluster] = if (loadConfigOrThrow[ClusterConfig](ConfigKeys.cluster).useClusterBootstrap) {
} else if (loadConfigOrThrow[Seq[String]]("akka.cluster.seed-nodes").nonEmpty) {
} else {
override protected def emitMetrics() = {
schedulingState.blackboxInvokers.foldLeft(0L) { (total, curr) =>
if (curr.status.isUsable) { + total
} else {
schedulingState.managedInvokers.foldLeft(0L) { (total, curr) =>
if (curr.status.isUsable) { + total
} else {
MetricEmitter.emitGaugeMetric(HEALTHY_INVOKER_MANAGED, schedulingState.managedInvokers.count(_.status == Healthy))
schedulingState.managedInvokers.count(_.status == Unhealthy))
schedulingState.managedInvokers.count(_.status == Unresponsive))
MetricEmitter.emitGaugeMetric(OFFLINE_INVOKER_MANAGED, schedulingState.managedInvokers.count(_.status == Offline))
MetricEmitter.emitGaugeMetric(HEALTHY_INVOKER_BLACKBOX, schedulingState.blackboxInvokers.count(_.status == Healthy))
schedulingState.blackboxInvokers.count(_.status == Unhealthy))
schedulingState.blackboxInvokers.count(_.status == Unresponsive))
MetricEmitter.emitGaugeMetric(OFFLINE_INVOKER_BLACKBOX, schedulingState.blackboxInvokers.count(_.status == Offline))
/** State needed for scheduling. */
val schedulingState = ShardingContainerPoolBalancerState()(lbConfig)
* Monitors invoker supervision and the cluster to update the state sequentially
* All state updates should go through this actor to guarantee that
* [[ShardingContainerPoolBalancerState.updateInvokers]] and [[ShardingContainerPoolBalancerState.updateCluster]]
* are called exclusive of each other and not concurrently.
private val monitor = actorSystem.actorOf(Props(new Actor {
override def preStart(): Unit = {
cluster.foreach(_.subscribe(self, classOf[MemberEvent], classOf[ReachabilityEvent]))
// all members of the cluster that are available
var availableMembers = Set.empty[Member]
override def receive: Receive = {
case CurrentInvokerPoolState(newState) =>
// State of the cluster as it is right now
case CurrentClusterState(members, _, _, _, _) =>
availableMembers = members.filter(_.status == MemberStatus.Up)
// General lifecycle events and events concerning the reachability of members. Split-brain is not a huge concern
// in this case as only the invoker-threshold is adjusted according to the perceived cluster-size.
// Taking the unreachable member out of the cluster from that point-of-view results in a better experience
// even under split-brain-conditions, as that (in the worst-case) results in premature overloading of invokers vs.
// going into overflow mode prematurely.
case event: ClusterDomainEvent =>
availableMembers = event match {
case MemberUp(member) => availableMembers + member
case ReachableMember(member) => availableMembers + member
case MemberRemoved(member, _) => availableMembers - member
case UnreachableMember(member) => availableMembers - member
case _ => availableMembers
/** Loadbalancer interface methods */
override def invokerHealth(): Future[IndexedSeq[InvokerHealth]] = Future.successful(schedulingState.invokers)
override def clusterSize: Int = schedulingState.clusterSize
/** 1. Publish a message to the loadbalancer */
override def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)(
implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]] = {
val isBlackboxInvocation = action.exec.pull
val actionType = if (!isBlackboxInvocation) "managed" else "blackbox"
val (invokersToUse, stepSizes) =
if (!isBlackboxInvocation) (schedulingState.managedInvokers, schedulingState.managedStepSizes)
else (schedulingState.blackboxInvokers, schedulingState.blackboxStepSizes)
val chosen = if (invokersToUse.nonEmpty) {
val hash = ShardingContainerPoolBalancer.generateHash(, action.fullyQualifiedName(false))
val homeInvoker = hash % invokersToUse.size
val stepSize = stepSizes(hash % stepSizes.size)
val invoker: Option[(InvokerInstanceId, Boolean)] = ShardingContainerPoolBalancer.schedule(
invoker.foreach {
case (_, true) =>
val metric =
if (isBlackboxInvocation)
case _ =>
} else {
.map { invoker =>
// MemoryLimit() and TimeLimit() return singletons - they should be fast enough to be used here
val memoryLimit = action.limits.memory
val memoryLimitInfo = if (memoryLimit == MemoryLimit()) { "std" } else { "non-std" }
val timeLimit = action.limits.timeout
val timeLimitInfo = if (timeLimit == TimeLimit()) { "std" } else { "non-std" }
s"scheduled activation ${msg.activationId}, action '${msg.action.asString}' ($actionType), ns '${}', mem limit ${memoryLimit.megabytes} MB (${memoryLimitInfo}), time limit ${timeLimit.duration.toMillis} ms (${timeLimitInfo}) to ${invoker}")
val activationResult = setupActivation(msg, action, invoker)
sendActivationToInvoker(messageProducer, msg, invoker).map(_ => activationResult)
.getOrElse {
// report the state of all invokers
val invokerStates = invokersToUse.foldLeft(Map.empty[InvokerState, Int]) { (agg, curr) =>
val count = agg.getOrElse(curr.status, 0) + 1
agg + (curr.status -> count)
s"failed to schedule activation ${msg.activationId}, action '${msg.action.asString}' ($actionType), ns '${}' - invokers to use: $invokerStates")
Future.failed(LoadBalancerException("No invokers available"))
override val invokerPool =
override protected def releaseInvoker(invoker: InvokerInstanceId, entry: ActivationEntry) = {
.foreach(_.releaseConcurrent(entry.fullyQualifiedEntityName, entry.maxConcurrent, entry.memoryLimit.toMB.toInt))
object ShardingContainerPoolBalancer extends LoadBalancerProvider {
override def instance(whiskConfig: WhiskConfig, instance: ControllerInstanceId)(
implicit actorSystem: ActorSystem,
logging: Logging,
materializer: ActorMaterializer): LoadBalancer = {
val invokerPoolFactory = new InvokerPoolFactory {
override def createInvokerPool(
actorRefFactory: ActorRefFactory,
messagingProvider: MessagingProvider,
messagingProducer: MessageProducer,
sendActivationToInvoker: (MessageProducer, ActivationMessage, InvokerInstanceId) => Future[RecordMetadata],
monitor: Option[ActorRef]): ActorRef = {
InvokerPool.prepare(instance, WhiskEntityStore.datastore())
(f, i) => f.actorOf(InvokerActor.props(i, instance)),
(m, i) => sendActivationToInvoker(messagingProducer, m, i),
maxPeek = 128),
new ShardingContainerPoolBalancer(
createFeedFactory(whiskConfig, instance),
def requiredProperties: Map[String, String] = kafkaHosts
/** Generates a hash based on the string representation of namespace and action */
def generateHash(namespace: EntityName, action: FullyQualifiedEntityName): Int = {
(namespace.asString.hashCode() ^ action.asString.hashCode()).abs
/** Euclidean algorithm to determine the greatest-common-divisor */
def gcd(a: Int, b: Int): Int = if (b == 0) a else gcd(b, a % b)
/** Returns pairwise coprime numbers until x. Result is memoized. */
def pairwiseCoprimeNumbersUntil(x: Int): IndexedSeq[Int] =
(1 to x).foldLeft(IndexedSeq.empty[Int])((primes, cur) => {
if (gcd(cur, x) == 1 && primes.forall(i => gcd(i, cur) == 1)) {
primes :+ cur
} else primes
* Scans through all invokers and searches for an invoker tries to get a free slot on an invoker. If no slot can be
* obtained, randomly picks a healthy invoker.
* @param maxConcurrent concurrency limit supported by this action
* @param invokers a list of available invokers to search in, including their state
* @param dispatched semaphores for each invoker to give the slots away from
* @param slots Number of slots, that need to be acquired (e.g. memory in MB)
* @param index the index to start from (initially should be the "homeInvoker"
* @param step stable identifier of the entity to be scheduled
* @return an invoker to schedule to or None of no invoker is available
def schedule(
maxConcurrent: Int,
fqn: FullyQualifiedEntityName,
invokers: IndexedSeq[InvokerHealth],
dispatched: IndexedSeq[NestedSemaphore[FullyQualifiedEntityName]],
slots: Int,
index: Int,
step: Int,
stepsDone: Int = 0)(implicit logging: Logging, transId: TransactionId): Option[(InvokerInstanceId, Boolean)] = {
val numInvokers = invokers.size
if (numInvokers > 0) {
val invoker = invokers(index)
//test this invoker - if this action supports concurrency, use the scheduleConcurrent function
if (invoker.status.isUsable && dispatched(, maxConcurrent, slots)) {
Some(, false)
} else {
// If we've gone through all invokers
if (stepsDone == numInvokers + 1) {
val healthyInvokers = invokers.filter(_.status.isUsable)
if (healthyInvokers.nonEmpty) {
// Choose a healthy invoker randomly
val random = healthyInvokers(ThreadLocalRandom.current().nextInt(healthyInvokers.size)).id
dispatched(random.toInt).forceAcquireConcurrent(fqn, maxConcurrent, slots)
logging.warn(this, s"system is overloaded. Chose invoker${random.toInt} by random assignment.")
Some(random, true)
} else {
} else {
val newIndex = (index + step) % numInvokers
schedule(maxConcurrent, fqn, invokers, dispatched, slots, newIndex, step, stepsDone + 1)
} else {
* Holds the state necessary for scheduling of actions.
* @param _invokers all of the known invokers in the system
* @param _managedInvokers all invokers for managed runtimes
* @param _blackboxInvokers all invokers for blackbox runtimes
* @param _managedStepSizes the step-sizes possible for the current managed invoker count
* @param _blackboxStepSizes the step-sizes possible for the current blackbox invoker count
* @param _invokerSlots state of accessible slots of each invoker
case class ShardingContainerPoolBalancerState(
private var _invokers: IndexedSeq[InvokerHealth] = IndexedSeq.empty[InvokerHealth],
private var _managedInvokers: IndexedSeq[InvokerHealth] = IndexedSeq.empty[InvokerHealth],
private var _blackboxInvokers: IndexedSeq[InvokerHealth] = IndexedSeq.empty[InvokerHealth],
private var _managedStepSizes: Seq[Int] = ShardingContainerPoolBalancer.pairwiseCoprimeNumbersUntil(0),
private var _blackboxStepSizes: Seq[Int] = ShardingContainerPoolBalancer.pairwiseCoprimeNumbersUntil(0),
protected[loadBalancer] var _invokerSlots: IndexedSeq[NestedSemaphore[FullyQualifiedEntityName]] =
private var _clusterSize: Int = 1)(
lbConfig: ShardingContainerPoolBalancerConfig =
loadConfigOrThrow[ShardingContainerPoolBalancerConfig](ConfigKeys.loadbalancer))(implicit logging: Logging) {
// Managed fraction and blackbox fraction can be between 0.0 and 1.0. The sum of these two fractions has to be between
// 1.0 and 2.0.
// If the sum is 1.0 that means, that there is no overlap of blackbox and managed invokers. If the sum is 2.0, that
// means, that there is no differentiation between managed and blackbox invokers.
// If the sum is below 1.0 with the initial values from config, the blackbox fraction will be set higher than
// specified in config and adapted to the managed fraction.
private val managedFraction: Double = Math.max(0.0, Math.min(1.0, lbConfig.managedFraction))
private val blackboxFraction: Double = Math.max(1.0 - managedFraction, Math.min(1.0, lbConfig.blackboxFraction)), s"managedFraction = $managedFraction, blackboxFraction = $blackboxFraction")(
/** Getters for the variables, setting from the outside is only allowed through the update methods below */
def invokers: IndexedSeq[InvokerHealth] = _invokers
def managedInvokers: IndexedSeq[InvokerHealth] = _managedInvokers
def blackboxInvokers: IndexedSeq[InvokerHealth] = _blackboxInvokers
def managedStepSizes: Seq[Int] = _managedStepSizes
def blackboxStepSizes: Seq[Int] = _blackboxStepSizes
def invokerSlots: IndexedSeq[NestedSemaphore[FullyQualifiedEntityName]] = _invokerSlots
def clusterSize: Int = _clusterSize
* @param memory
* @return calculated invoker slot
private def getInvokerSlot(memory: ByteSize): ByteSize = {
val invokerShardMemorySize = memory / _clusterSize
val newTreshold = if (invokerShardMemorySize < MemoryLimit.MIN_MEMORY) {
s"registered controllers: calculated controller's invoker shard memory size falls below the min memory of one action. "
+ s"Setting to min memory. Expect invoker overloads. Cluster size ${_clusterSize}, invoker user memory size ${memory.toMB.MB}, "
+ s"min action memory size ${MemoryLimit.MIN_MEMORY.toMB.MB}, calculated shard size ${invokerShardMemorySize.toMB.MB}.")(
} else {
* Updates the scheduling state with the new invokers.
* This is okay to not happen atomically since dirty reads of the values set are not dangerous. It is important though
* to update the "invokers" variables last, since they will determine the range of invokers to choose from.
* Handling a shrinking invokers list is not necessary, because InvokerPool won't shrink its own list but rather
* report the invoker as "Offline".
* It is important that this method does not run concurrently to itself and/or to [[updateCluster]]
def updateInvokers(newInvokers: IndexedSeq[InvokerHealth]): Unit = {
val oldSize = _invokers.size
val newSize = newInvokers.size
// for small N, allow the managed invokers to overlap with blackbox invokers, and
// further assume that blackbox invokers << managed invokers
val managed = Math.max(1, Math.ceil(newSize.toDouble * managedFraction).toInt)
val blackboxes = Math.max(1, Math.floor(newSize.toDouble * blackboxFraction).toInt)
_invokers = newInvokers
_managedInvokers = _invokers.take(managed)
_blackboxInvokers = _invokers.takeRight(blackboxes)
val logDetail = if (oldSize != newSize) {
_managedStepSizes = ShardingContainerPoolBalancer.pairwiseCoprimeNumbersUntil(managed)
_blackboxStepSizes = ShardingContainerPoolBalancer.pairwiseCoprimeNumbersUntil(blackboxes)
if (oldSize < newSize) {
// Keeps the existing state..
val onlyNewInvokers = _invokers.drop(_invokerSlots.length)
_invokerSlots = _invokerSlots ++ { invoker =>
new NestedSemaphore[FullyQualifiedEntityName](getInvokerSlot(
val newInvokerDetails = onlyNewInvokers
.map(i =>
s"${}: ${i.status} / ${getInvokerSlot(} of ${}")
.mkString(", ")
s"number of known invokers increased: new = $newSize, old = $oldSize. details: $newInvokerDetails."
} else {
s"number of known invokers decreased: new = $newSize, old = $oldSize."
} else {
s"no update required - number of known invokers unchanged: $newSize."
s"loadbalancer invoker status updated. managedInvokers = $managed blackboxInvokers = $blackboxes. $logDetail")(
* Updates the size of a cluster. Throws away all state for simplicity.
* This is okay to not happen atomically, since a dirty read of the values set are not dangerous. At worst the
* scheduler works on outdated invoker-load data which is acceptable.
* It is important that this method does not run concurrently to itself and/or to [[updateInvokers]]
def updateCluster(newSize: Int): Unit = {
val actualSize = newSize max 1 // if a cluster size < 1 is reported, falls back to a size of 1 (alone)
if (_clusterSize != actualSize) {
val oldSize = _clusterSize
_clusterSize = actualSize
_invokerSlots = { invoker =>
new NestedSemaphore[FullyQualifiedEntityName](getInvokerSlot(
// Directly after startup, no invokers have registered yet. This needs to be handled gracefully.
val invokerCount = _invokers.size
val totalInvokerMemory =
_invokers.foldLeft(0L)((total, invoker) => total + getInvokerSlot(
val averageInvokerMemory =
if (totalInvokerMemory.toMB > 0 && invokerCount > 0) {
(totalInvokerMemory / invokerCount).toMB.MB
} else {
s"loadbalancer cluster size changed from $oldSize to $actualSize active nodes. ${invokerCount} invokers with ${averageInvokerMemory} average memory size - total invoker memory ${totalInvokerMemory}.")(
* Configuration for the cluster created between loadbalancers.
* @param useClusterBootstrap Whether or not to use a bootstrap mechanism
case class ClusterConfig(useClusterBootstrap: Boolean)
* Configuration for the sharding container pool balancer.
* @param blackboxFraction the fraction of all invokers to use exclusively for blackboxes
* @param timeoutFactor factor to influence the timeout period for forced active acks (time-limit.std * timeoutFactor + timeoutAddon)
* @param timeoutAddon extra time to influence the timeout period for forced active acks (time-limit.std * timeoutFactor + timeoutAddon)
case class ShardingContainerPoolBalancerConfig(managedFraction: Double,
blackboxFraction: Double,
timeoutFactor: Int,
timeoutAddon: FiniteDuration)
* State kept for each activation slot until completion.
* @param id id of the activation
* @param namespaceId namespace that invoked the action
* @param invokerName invoker the action is scheduled to
* @param memoryLimit memory limit of the invoked action
* @param timeLimit time limit of the invoked action
* @param maxConcurrent concurrency limit of the invoked action
* @param fullyQualifiedEntityName fully qualified name of the invoked action
* @param timeoutHandler times out completion of this activation, should be canceled on good paths
* @param isBlackbox true if the invoked action is a blackbox action, otherwise false (managed action)
* @param isBlocking true if the action is invoked in a blocking fashion, i.e. "somebody" waits for the result
case class ActivationEntry(id: ActivationId,
namespaceId: UUID,
invokerName: InvokerInstanceId,
memoryLimit: ByteSize,
timeLimit: FiniteDuration,
maxConcurrent: Int,
fullyQualifiedEntityName: FullyQualifiedEntityName,
timeoutHandler: Cancellable,
isBlackbox: Boolean,
isBlocking: Boolean)