blob: dbadbce6e9bc2a38e5f363e2db63326293a26306 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.openwhisk.core.loadBalancer.test
import akka.actor.ActorRef
import akka.actor.ActorRefFactory
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.testkit.TestProbe
import common.{StreamLogging, WhiskProperties}
import java.nio.charset.StandardCharsets
import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.kafka.common.TopicPartition
import org.apache.openwhisk.common.InvokerState.{Healthy, Offline, Unhealthy}
import org.junit.runner.RunWith
import org.scalamock.scalatest.MockFactory
import org.scalatest.junit.JUnitRunner
import org.scalatest.FlatSpec
import org.scalatest.Matchers
import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration._
import org.apache.openwhisk.common.{InvokerHealth, Logging, NestedSemaphore, TransactionId}
import org.apache.openwhisk.core.entity.FullyQualifiedEntityName
import org.apache.openwhisk.core.WhiskConfig
import org.apache.openwhisk.core.connector.ActivationMessage
import org.apache.openwhisk.core.connector.CompletionMessage
import org.apache.openwhisk.core.connector.Message
import org.apache.openwhisk.core.connector.MessageConsumer
import org.apache.openwhisk.core.connector.MessageProducer
import org.apache.openwhisk.core.connector.MessagingProvider
import org.apache.openwhisk.core.entity.ActivationId
import org.apache.openwhisk.core.entity.BasicAuthenticationAuthKey
import org.apache.openwhisk.core.entity.ControllerInstanceId
import org.apache.openwhisk.core.entity.EntityName
import org.apache.openwhisk.core.entity.EntityPath
import org.apache.openwhisk.core.entity.ExecManifest
import org.apache.openwhisk.core.entity.Identity
import org.apache.openwhisk.core.entity.InvokerInstanceId
import org.apache.openwhisk.core.entity.MemoryLimit
import org.apache.openwhisk.core.entity.Namespace
import org.apache.openwhisk.core.entity.Secret
import org.apache.openwhisk.core.entity.Subject
import org.apache.openwhisk.core.entity.UUID
import org.apache.openwhisk.core.entity.WhiskActionMetaData
import org.apache.openwhisk.core.entity.test.ExecHelpers
import org.apache.openwhisk.core.entity.ByteSize
import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.core.entity.test.ExecHelpers
import org.apache.openwhisk.core.loadBalancer.FeedFactory
import org.apache.openwhisk.core.loadBalancer.InvokerPoolFactory
import org.apache.openwhisk.core.loadBalancer._
/**
* Unit tests for the ContainerPool object.
*
* These tests test only the "static" methods "schedule" and "remove"
* of the ContainerPool object.
*/
@RunWith(classOf[JUnitRunner])
class ShardingContainerPoolBalancerTests
extends FlatSpec
with Matchers
with StreamLogging
with ExecHelpers
with MockFactory {
behavior of "ShardingContainerPoolBalancerState"
val defaultUserMemory: ByteSize = 1024.MB
def healthy(i: Int, memory: ByteSize = defaultUserMemory) =
new InvokerHealth(InvokerInstanceId(i, userMemory = memory), Healthy)
def unhealthy(i: Int) = new InvokerHealth(InvokerInstanceId(i, userMemory = defaultUserMemory), Unhealthy)
def offline(i: Int) = new InvokerHealth(InvokerInstanceId(i, userMemory = defaultUserMemory), Offline)
def semaphores(count: Int, max: Int): IndexedSeq[NestedSemaphore[FullyQualifiedEntityName]] =
IndexedSeq.fill(count)(new NestedSemaphore[FullyQualifiedEntityName](max))
def lbConfig(blackboxFraction: Double, managedFraction: Option[Double] = None) =
ShardingContainerPoolBalancerConfig(
managedFraction.getOrElse(1.0 - blackboxFraction),
blackboxFraction,
1,
1.minute)
it should "update invoker's state, growing the slots data and keeping valid old data" in {
// start empty
val slots = 10
val memoryPerSlot = MemoryLimit.MIN_MEMORY
val memory = memoryPerSlot * slots
val state = ShardingContainerPoolBalancerState()(lbConfig(0.5))
state.invokers shouldBe 'empty
state.blackboxInvokers shouldBe 'empty
state.managedInvokers shouldBe 'empty
state.invokerSlots shouldBe 'empty
state.managedStepSizes shouldBe Seq.empty
state.blackboxStepSizes shouldBe Seq.empty
// apply one update, verify everything is updated accordingly
val update1 = IndexedSeq(healthy(0, memory))
state.updateInvokers(update1)
state.invokers shouldBe update1
state.blackboxInvokers shouldBe update1 // fallback to at least one
state.managedInvokers shouldBe update1 // fallback to at least one
state.invokerSlots should have size update1.size
state.invokerSlots.head.availablePermits shouldBe memory.toMB
state.managedStepSizes shouldBe Seq(1)
state.blackboxStepSizes shouldBe Seq(1)
// acquire a slot to alter invoker state
state.invokerSlots.head.tryAcquire(memoryPerSlot.toMB.toInt)
state.invokerSlots.head.availablePermits shouldBe (memory - memoryPerSlot).toMB.toInt
// apply second update, growing the state
val update2 =
IndexedSeq(healthy(0, memory), healthy(1, memory * 2))
state.updateInvokers(update2)
state.invokers shouldBe update2
state.managedInvokers shouldBe IndexedSeq(update2.head)
state.blackboxInvokers shouldBe IndexedSeq(update2.last)
state.invokerSlots should have size update2.size
state.invokerSlots.head.availablePermits shouldBe (memory - memoryPerSlot).toMB.toInt
state.invokerSlots(1).tryAcquire(memoryPerSlot.toMB.toInt)
state.invokerSlots(1).availablePermits shouldBe memory.toMB * 2 - memoryPerSlot.toMB
state.managedStepSizes shouldBe Seq(1)
state.blackboxStepSizes shouldBe Seq(1)
}
it should "allow managed partition to overlap with blackbox for small N" in {
Seq(0.1, 0.2, 0.3, 0.4, 0.5).foreach { bf =>
val state = ShardingContainerPoolBalancerState()(lbConfig(bf))
(1 to 100).toSeq.foreach { i =>
state.updateInvokers((1 to i).map(_ => healthy(1, MemoryLimit.STD_MEMORY)))
withClue(s"invoker count $bf $i:") {
state.managedInvokers.length should be <= i
state.blackboxInvokers should have size Math.max(1, (bf * i).toInt)
val m = state.managedInvokers.length
val b = state.blackboxInvokers.length
bf match {
// written out explicitly for clarity
case 0.1 if i < 10 => m + b shouldBe i + 1
case 0.2 if i < 5 => m + b shouldBe i + 1
case 0.3 if i < 4 => m + b shouldBe i + 1
case 0.4 if i < 3 => m + b shouldBe i + 1
case 0.5 if i < 2 => m + b shouldBe i + 1
case _ => m + b shouldBe i
}
}
}
}
}
it should "return the same pools if managed- and blackbox-pools are overlapping" in {
val state = ShardingContainerPoolBalancerState()(lbConfig(1.0, Some(1.0)))
(1 to 100).foreach { i =>
state.updateInvokers((1 to i).map(_ => healthy(1, MemoryLimit.STD_MEMORY)))
}
state.managedInvokers should have size 100
state.blackboxInvokers should have size 100
state.managedInvokers shouldBe state.blackboxInvokers
}
it should "update the cluster size, adjusting the invoker slots accordingly" in {
val slots = 10
val memoryPerSlot = MemoryLimit.MIN_MEMORY
val memory = memoryPerSlot * slots
val state = ShardingContainerPoolBalancerState()(lbConfig(0.5))
state.updateInvokers(IndexedSeq(healthy(0, memory), healthy(1, memory * 2)))
state.invokerSlots.head.tryAcquire(memoryPerSlot.toMB.toInt)
state.invokerSlots.head.availablePermits shouldBe (memory - memoryPerSlot).toMB
state.invokerSlots(1).tryAcquire(memoryPerSlot.toMB.toInt)
state.invokerSlots(1).availablePermits shouldBe memory.toMB * 2 - memoryPerSlot.toMB
state.updateCluster(2)
state.invokerSlots.head.availablePermits shouldBe memory.toMB / 2 // state reset + divided by 2
state.invokerSlots(1).availablePermits shouldBe memory.toMB
}
it should "fallback to a size of 1 (alone) if cluster size is < 1" in {
val slots = 10
val memoryPerSlot = MemoryLimit.MIN_MEMORY
val memory = memoryPerSlot * slots
val state = ShardingContainerPoolBalancerState()(lbConfig(0.5))
state.updateInvokers(IndexedSeq(healthy(0, memory)))
state.invokerSlots.head.availablePermits shouldBe memory.toMB
state.updateCluster(2)
state.invokerSlots.head.availablePermits shouldBe memory.toMB / 2
state.updateCluster(0)
state.invokerSlots.head.availablePermits shouldBe memory.toMB
state.updateCluster(-1)
state.invokerSlots.head.availablePermits shouldBe memory.toMB
}
it should "set the threshold to 1 if the cluster is bigger than there are slots on 1 invoker" in {
val slots = 10
val memoryPerSlot = MemoryLimit.MIN_MEMORY
val memory = memoryPerSlot * slots
val state = ShardingContainerPoolBalancerState()(lbConfig(0.5))
state.updateInvokers(IndexedSeq(healthy(0, memory)))
state.invokerSlots.head.availablePermits shouldBe memory.toMB
state.updateCluster(20)
state.invokerSlots.head.availablePermits shouldBe MemoryLimit.MIN_MEMORY.toMB
}
val namespace = EntityPath("testspace")
val name = EntityName("testname")
val fqn = FullyQualifiedEntityName(namespace, name)
behavior of "schedule"
implicit val transId = TransactionId.testing
it should "return None on an empty invoker list" in {
ShardingContainerPoolBalancer.schedule(
1,
fqn,
IndexedSeq.empty,
IndexedSeq.empty,
MemoryLimit.MIN_MEMORY.toMB.toInt,
index = 0,
step = 2) shouldBe None
}
it should "return None if no invokers are healthy" in {
val invokerCount = 3
val invokerSlots = semaphores(invokerCount, 3)
val invokers = (0 until invokerCount).map(unhealthy)
ShardingContainerPoolBalancer.schedule(
1,
fqn,
invokers,
invokerSlots,
MemoryLimit.MIN_MEMORY.toMB.toInt,
index = 0,
step = 2) shouldBe None
}
it should "choose the first available invoker, jumping in stepSize steps, falling back to randomized scheduling once all invokers are full" in {
val invokerCount = 3
val slotPerInvoker = 3
val invokerSlots = semaphores(invokerCount + 3, slotPerInvoker) // needs to be offset by 3 as well
val invokers = (0 until invokerCount).map(i => healthy(i + 3)) // offset by 3 to asset InstanceId is returned
val expectedResult = Seq(3, 3, 3, 5, 5, 5, 4, 4, 4)
val result = expectedResult.map { _ =>
ShardingContainerPoolBalancer
.schedule(1, fqn, invokers, invokerSlots, 1, index = 0, step = 2)
.get
._1
.toInt
}
result shouldBe expectedResult
val bruteResult = (0 to 100).map { _ =>
ShardingContainerPoolBalancer
.schedule(1, fqn, invokers, invokerSlots, 1, index = 0, step = 2)
.get
}
bruteResult.map(_._1.toInt) should contain allOf (3, 4, 5)
bruteResult.map(_._2) should contain only true
}
it should "ignore unhealthy or offline invokers" in {
val invokers = IndexedSeq(healthy(0), unhealthy(1), offline(2), healthy(3))
val slotPerInvoker = 3
val invokerSlots = semaphores(invokers.size, slotPerInvoker)
val expectedResult = Seq(0, 0, 0, 3, 3, 3)
val result = expectedResult.map { _ =>
ShardingContainerPoolBalancer
.schedule(1, fqn, invokers, invokerSlots, 1, index = 0, step = 1)
.get
._1
.toInt
}
result shouldBe expectedResult
// more schedules will result in randomized invokers, but the unhealthy and offline invokers should not be part
val bruteResult = (0 to 100).map { _ =>
ShardingContainerPoolBalancer
.schedule(1, fqn, invokers, invokerSlots, 1, index = 0, step = 1)
.get
}
bruteResult.map(_._1.toInt) should contain allOf (0, 3)
bruteResult.map(_._1.toInt) should contain noneOf (1, 2)
bruteResult.map(_._2) should contain only true
}
it should "only take invokers that have enough free slots" in {
val invokerCount = 3
// Each invoker has 4 slots
val invokerSlots = semaphores(invokerCount, 4)
val invokers = (0 until invokerCount).map(i => healthy(i))
// Ask for three slots -> First invoker should be used
ShardingContainerPoolBalancer
.schedule(1, fqn, invokers, invokerSlots, 3, index = 0, step = 1)
.get
._1
.toInt shouldBe 0
// Ask for two slots -> Second invoker should be used
ShardingContainerPoolBalancer
.schedule(1, fqn, invokers, invokerSlots, 2, index = 0, step = 1)
.get
._1
.toInt shouldBe 1
// Ask for 1 slot -> First invoker should be used
ShardingContainerPoolBalancer
.schedule(1, fqn, invokers, invokerSlots, 1, index = 0, step = 1)
.get
._1
.toInt shouldBe 0
// Ask for 4 slots -> Third invoker should be used
ShardingContainerPoolBalancer
.schedule(1, fqn, invokers, invokerSlots, 4, index = 0, step = 1)
.get
._1
.toInt shouldBe 2
// Ask for 2 slots -> Second invoker should be used
ShardingContainerPoolBalancer
.schedule(1, fqn, invokers, invokerSlots, 2, index = 0, step = 1)
.get
._1
.toInt shouldBe 1
invokerSlots.foreach(_.availablePermits shouldBe 0)
}
behavior of "pairwiseCoprimeNumbersUntil"
it should "return an empty set for malformed inputs" in {
ShardingContainerPoolBalancer.pairwiseCoprimeNumbersUntil(0) shouldBe Seq.empty
ShardingContainerPoolBalancer.pairwiseCoprimeNumbersUntil(-1) shouldBe Seq.empty
}
it should "return all coprime numbers until the number given" in {
ShardingContainerPoolBalancer.pairwiseCoprimeNumbersUntil(1) shouldBe Seq(1)
ShardingContainerPoolBalancer.pairwiseCoprimeNumbersUntil(2) shouldBe Seq(1)
ShardingContainerPoolBalancer.pairwiseCoprimeNumbersUntil(3) shouldBe Seq(1, 2)
ShardingContainerPoolBalancer.pairwiseCoprimeNumbersUntil(4) shouldBe Seq(1, 3)
ShardingContainerPoolBalancer.pairwiseCoprimeNumbersUntil(5) shouldBe Seq(1, 2, 3)
ShardingContainerPoolBalancer.pairwiseCoprimeNumbersUntil(9) shouldBe Seq(1, 2, 5, 7)
ShardingContainerPoolBalancer.pairwiseCoprimeNumbersUntil(10) shouldBe Seq(1, 3, 7)
}
behavior of "concurrent actions"
it should "allow concurrent actions to be scheduled to same invoker without affecting memory slots" in {
val invokerCount = 3
// Each invoker has 2 slots, each action has concurrency 3
val slots = 2
val invokerSlots = semaphores(invokerCount, slots)
val concurrency = 3
val invokers = (0 until invokerCount).map(i => healthy(i))
(0 until invokerCount).foreach { i =>
(1 to slots).foreach { s =>
(1 to concurrency).foreach { c =>
ShardingContainerPoolBalancer
.schedule(concurrency, fqn, invokers, invokerSlots, 1, 0, 1)
.get
._1
.toInt shouldBe i
invokerSlots
.lift(i)
.get
.concurrentState(fqn)
.availablePermits shouldBe concurrency - c
}
}
}
}
implicit val am = ActorMaterializer()
val config = new WhiskConfig(ExecManifest.requiredProperties)
val invokerMem = 2000.MB
val concurrencyEnabled = Option(WhiskProperties.getProperty("whisk.action.concurrency")).exists(_.toBoolean)
val concurrency = if (concurrencyEnabled) 5 else 1
val actionMem = 256.MB
val actionMetaData =
WhiskActionMetaData(
namespace,
name,
jsMetaData(Some("jsMain"), false),
limits = actionLimits(actionMem, concurrency))
val maxContainers = invokerMem.toMB.toInt / actionMetaData.limits.memory.megabytes
val numInvokers = 3
val maxActivations = maxContainers * numInvokers * concurrency
//run a separate test for each variant of 1..n concurrently-ish arriving activations, to exercise:
// - no containers started
// - containers started but no concurrency room
// - no concurrency room and no memory room to launch new containers
//(1 until maxActivations).foreach { i =>
(75 until maxActivations).foreach { i =>
it should s"reflect concurrent processing $i state in containerSlots" in {
//each batch will:
// - submit activations concurrently
// - wait for activation submission to messaging system (mostly to detect which invoker was assiged
// - verify remaining concurrency slots available
// - complete activations concurrently
// - verify concurrency/memory slots are released
testActivationBatch(i)
}
}
def mockMessaging(): MessagingProvider = {
val messaging = stub[MessagingProvider]
val producer = stub[MessageProducer]
val consumer = stub[MessageConsumer]
(messaging
.getProducer(_: WhiskConfig, _: Option[ByteSize])(_: Logging, _: ActorSystem))
.when(*, *, *, *)
.returns(producer)
(messaging
.getConsumer(_: WhiskConfig, _: String, _: String, _: Int, _: FiniteDuration)(_: Logging, _: ActorSystem))
.when(*, *, *, *, *, *, *)
.returns(consumer)
(producer
.send(_: String, _: Message, _: Int))
.when(*, *, *)
.returns(Future.successful(new RecordMetadata(new TopicPartition("fake", 0), 0, 0, 0l, 0l, 0, 0)))
messaging
}
def testActivationBatch(numActivations: Int): Unit = {
//setup mock messaging
val feedProbe = new FeedFactory {
def createFeed(f: ActorRefFactory, m: MessagingProvider, p: (Array[Byte]) => Future[Unit]) =
TestProbe().testActor
}
val invokerPoolProbe = new InvokerPoolFactory {
override def createInvokerPool(
actorRefFactory: ActorRefFactory,
messagingProvider: MessagingProvider,
messagingProducer: MessageProducer,
sendActivationToInvoker: (MessageProducer, ActivationMessage, InvokerInstanceId) => Future[RecordMetadata],
monitor: Option[ActorRef]): ActorRef =
TestProbe().testActor
}
val balancer =
new ShardingContainerPoolBalancer(config, ControllerInstanceId("0"), feedProbe, invokerPoolProbe, mockMessaging)
val invokers = IndexedSeq.tabulate(numInvokers) { i =>
new InvokerHealth(InvokerInstanceId(i, userMemory = invokerMem), Healthy)
}
balancer.schedulingState.updateInvokers(invokers)
val invocationNamespace = EntityName("invocationSpace")
val fqn = actionMetaData.fullyQualifiedName(true)
val hash =
ShardingContainerPoolBalancer.generateHash(invocationNamespace, actionMetaData.fullyQualifiedName(false))
val home = hash % invokers.size
val stepSizes = ShardingContainerPoolBalancer.pairwiseCoprimeNumbersUntil(invokers.size)
val stepSize = stepSizes(hash % stepSizes.size)
val uuid = UUID()
//initiate activation
val published = (0 until numActivations).map { _ =>
val aid = ActivationId.generate()
val msg = ActivationMessage(
TransactionId.testing,
actionMetaData.fullyQualifiedName(true),
actionMetaData.rev,
Identity(Subject(), Namespace(invocationNamespace, uuid), BasicAuthenticationAuthKey(uuid, Secret())),
aid,
ControllerInstanceId("0"),
blocking = false,
content = None,
initArgs = Set.empty,
lockedArgs = Map.empty)
//send activation to loadbalancer
aid -> balancer.publish(actionMetaData.toExecutableWhiskAction.get, msg)
}.toMap
val activations = published.values
val ids = published.keys
//wait for activation submissions
Await.ready(Future.sequence(activations.toList), 10.seconds)
val maxActivationsPerInvoker = concurrency * maxContainers
//verify updated concurrency slots
def rem(count: Int) =
if (count % concurrency > 0) {
concurrency - (count % concurrency)
} else {
0
}
//assert available permits per invoker are as expected
var nextInvoker = home
ids.toList.grouped(maxActivationsPerInvoker).zipWithIndex.foreach { g =>
val remaining = rem(g._1.size)
val concurrentState = balancer.schedulingState._invokerSlots
.lift(nextInvoker)
.get
.concurrentState(fqn)
concurrentState.availablePermits shouldBe remaining
concurrentState.counter shouldBe g._1.size
nextInvoker = (nextInvoker + stepSize) % numInvokers
}
//complete all
val acks = ids.map { aid =>
val invoker = balancer.activationSlots(aid).invokerName
completeActivation(invoker, balancer, aid)
}
Await.ready(Future.sequence(acks), 10.seconds)
//verify invokers go back to unused state
invokers.foreach { i =>
val concurrentState = balancer.schedulingState._invokerSlots
.lift(i.id.toInt)
.get
.concurrentState
.get(fqn)
concurrentState shouldBe None
balancer.schedulingState._invokerSlots.lift(i.id.toInt).map { i =>
i.availablePermits shouldBe invokerMem.toMB
}
}
}
def completeActivation(invoker: InvokerInstanceId, balancer: ShardingContainerPoolBalancer, aid: ActivationId) = {
//complete activation
val ack =
CompletionMessage(TransactionId.testing, aid, Some(false), invoker).serialize.getBytes(StandardCharsets.UTF_8)
balancer.processAcknowledgement(ack)
}
}