blob: 243a58ec981b358ff95e155d51bce17fb18e489b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.openwhisk.core.scheduler.container.test
import java.util.concurrent.TimeUnit
import akka.actor.{ActorRef, ActorRefFactory, ActorSystem}
import akka.testkit.{ImplicitSender, TestKit, TestProbe}
import com.ibm.etcd.client.{EtcdClient => Client}
import common.StreamLogging
import org.apache.openwhisk.common.TransactionId
import org.apache.openwhisk.core.connector._
import org.apache.openwhisk.core.entity.ExecManifest.{ImageName, RuntimeManifest}
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.inProgressContainer
import org.apache.openwhisk.core.scheduler.container._
import org.apache.openwhisk.core.scheduler.message._
import org.apache.openwhisk.core.scheduler.queue.{MemoryQueueKey, MemoryQueueValue, QueuePool}
import org.apache.openwhisk.core.service.{RegisterData, UnregisterData}
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
import org.junit.runner.RunWith
import org.scalamock.scalatest.MockFactory
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.junit.JUnitRunner
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpecLike, Matchers}
import pureconfig.loadConfigOrThrow
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContextExecutor, Future}
@RunWith(classOf[JUnitRunner])
class CreationJobManagerTests
extends TestKit(ActorSystem("CreationJobManager"))
with ImplicitSender
with FlatSpecLike
with ScalaFutures
with Matchers
with MockFactory
with BeforeAndAfterAll
with BeforeAndAfterEach
with StreamLogging {
private val timeout = loadConfigOrThrow[FiniteDuration](ConfigKeys.schedulerInProgressJobRetentionSecond)
val blackboxTimeout = FiniteDuration(timeout.toSeconds * 3, TimeUnit.SECONDS)
implicit val ece: ExecutionContextExecutor = system.dispatcher
val config = new WhiskConfig(ExecManifest.requiredProperties)
val creationIdTest = CreationId.generate()
val isBlackboxInvocation = false
val testInvocationNamespace = "test-invocation-namespace"
val testNamespace = "test-namespace"
val testAction = "test-action"
val schedulerHost = "127.17.0.1"
val rpcPort = 13001
val exec = CodeExecAsString(RuntimeManifest("actionKind", ImageName("testImage")), "testCode", None)
val execAction = ExecutableWhiskAction(EntityPath(testNamespace), EntityName(testAction), exec)
val execMetadata =
CodeExecMetaDataAsString(RuntimeManifest(execAction.exec.kind, ImageName("test")), entryPoint = Some("test"))
val revision = DocRevision("1-testRev")
val actionMetadata =
WhiskActionMetaData(
execAction.namespace,
execAction.name,
execMetadata,
execAction.parameters,
execAction.limits,
execAction.version,
execAction.publish,
execAction.annotations)
override def afterAll(): Unit = {
client.close()
QueuePool.clear()
TestKit.shutdownActorSystem(system)
super.afterAll()
}
override def beforeEach(): Unit = {
QueuePool.clear()
}
def feedFactory(actorRefFactory: ActorRefFactory,
topic: String,
maxActiveAcksPerPoll: Int,
handler: Array[Byte] => Future[Unit]): ActorRef = {
TestProbe().ref
}
def createRegisterMessage(action: FullyQualifiedEntityName,
revision: DocRevision,
sid: SchedulerInstanceId): RegisterCreationJob = {
val message =
ContainerCreationMessage(
TransactionId.testing,
testInvocationNamespace,
action,
revision,
actionMetadata,
sid,
schedulerHost,
rpcPort,
creationId = creationIdTest)
RegisterCreationJob(message)
}
val action = FullyQualifiedEntityName(EntityPath("test namespace"), EntityName("actionName"))
val sid = SchedulerInstanceId("0")
val iid = InvokerInstanceId(0, userMemory = 1024.MB)
val testKey = inProgressContainer(testInvocationNamespace, action, revision, sid, creationIdTest)
val memory = 256.MB
val resources = Seq.empty[String]
val resourcesStrictPolicy = true
val registerMessage = createRegisterMessage(action, revision, sid)
val client: Client = {
val hostAndPorts = "172.17.0.1:2379"
Client.forEndpoints(hostAndPorts).withPlainText().build()
}
behavior of "CreationJobManager"
it should "register creation job" in {
val probe = TestProbe()
val manager =
system.actorOf(CreationJobManager.props(feedFactory, sid, probe.ref))
manager ! registerMessage
probe.expectMsg(RegisterData(testKey, "", failoverEnabled = false))
}
it should "skip duplicated creation job" in {
val probe = TestProbe()
val manager =
system.actorOf(CreationJobManager.props(feedFactory, sid, probe.ref))
manager ! registerMessage
manager ! registerMessage
probe.expectMsg(RegisterData(testKey, "", failoverEnabled = false))
probe.expectNoMessage()
}
def createFinishMessage(action: FullyQualifiedEntityName,
revision: DocRevision,
memory: ByteSize,
invokerInstanceId: InvokerInstanceId,
retryCount: Int = 0,
error: Option[ContainerCreationError] = None): FinishCreationJob = {
val message =
ContainerCreationAckMessage(
TransactionId.testing,
creationIdTest,
testInvocationNamespace,
action,
revision,
actionMetadata,
invokerInstanceId,
schedulerHost,
rpcPort,
retryCount,
error)
FinishCreationJob(message)
}
def createRescheduling(finishMsg: FinishCreationJob): ReschedulingCreationJob =
ReschedulingCreationJob(
finishMsg.ack.transid,
finishMsg.ack.creationId,
finishMsg.ack.invocationNamespace,
finishMsg.ack.action,
finishMsg.ack.revision,
actionMetadata,
finishMsg.ack.schedulerHost,
finishMsg.ack.rpcPort,
finishMsg.ack.retryCount)
val normalFinish = createFinishMessage(action, revision, memory, iid, retryCount = 0)
val failedFinish =
createFinishMessage(action, revision, memory, iid, retryCount = 0, Some(ContainerCreationError.UnknownError))
val unrescheduleFinish =
createFinishMessage(action, revision, memory, iid, retryCount = 0, Some(ContainerCreationError.BlackBoxError))
val tooManyFinish =
createFinishMessage(action, revision, memory, iid, retryCount = 100, Some(ContainerCreationError.UnknownError))
it should "delete a creation job normally and send a SuccessfulCreationJob to a queue" in {
val containerManager = TestProbe()
val dataManagementService = TestProbe()
val probe = TestProbe()
val jobManager =
containerManager.childActorOf(CreationJobManager.props(feedFactory, sid, dataManagementService.ref))
QueuePool.put(
MemoryQueueKey(testInvocationNamespace, action.toDocId.asDocInfo(revision)),
MemoryQueueValue(probe.ref, true))
jobManager ! registerMessage
dataManagementService.expectMsg(RegisterData(testKey, "", failoverEnabled = false))
jobManager ! normalFinish
dataManagementService.expectMsg(UnregisterData(testKey))
containerManager.expectMsg(
SuccessfulCreationJob(
normalFinish.ack.creationId,
normalFinish.ack.invocationNamespace,
registerMessage.msg.action,
registerMessage.msg.revision))
probe.expectMsg(
SuccessfulCreationJob(
normalFinish.ack.creationId,
normalFinish.ack.invocationNamespace,
registerMessage.msg.action,
registerMessage.msg.revision))
}
it should "only delete a creation job with failed msg after all retries are failed" in {
val containerManager = TestProbe()
val dataManagementService = TestProbe()
val jobManager =
containerManager.childActorOf(CreationJobManager.props(feedFactory, sid, dataManagementService.ref))
jobManager ! registerMessage
dataManagementService.expectMsg(RegisterData(testKey, "", failoverEnabled = false))
jobManager ! failedFinish
containerManager.expectMsg(createRescheduling(failedFinish))
jobManager ! failedFinish.copy(ack = failedFinish.ack.copy(retryCount = 5))
dataManagementService.expectMsg(UnregisterData(testKey))
}
it should "delete a creation job with failed msg and send a FailedCreationJob to a queue" in {
val containerManager = TestProbe()
val dataManagementService = TestProbe()
val probe = TestProbe()
val jobManager =
containerManager.childActorOf(CreationJobManager.props(feedFactory, sid, dataManagementService.ref))
QueuePool.put(
MemoryQueueKey(testInvocationNamespace, action.toDocId.asDocInfo(revision)),
MemoryQueueValue(probe.ref, true))
jobManager ! registerMessage
dataManagementService.expectMsg(RegisterData(testKey, "", failoverEnabled = false))
jobManager ! unrescheduleFinish
dataManagementService.expectMsg(UnregisterData(testKey))
containerManager.expectMsg(
FailedCreationJob(
registerMessage.msg.creationId,
registerMessage.msg.invocationNamespace,
registerMessage.msg.action,
registerMessage.msg.revision,
ContainerCreationError.BlackBoxError,
"unknown reason"))
probe.expectMsg(
FailedCreationJob(
registerMessage.msg.creationId,
registerMessage.msg.invocationNamespace,
registerMessage.msg.action,
registerMessage.msg.revision,
ContainerCreationError.BlackBoxError,
"unknown reason"))
}
it should "delete a creation job that does not exist with failed msg" in {
val containerManager = TestProbe()
val dataManagementService = TestProbe()
val jobManager =
containerManager.childActorOf(CreationJobManager.props(feedFactory, sid, dataManagementService.ref))
jobManager ! failedFinish.copy(ack = failedFinish.ack.copy(retryCount = 5))
dataManagementService.expectMsg(UnregisterData(testKey))
}
it should "delete a creation job with timeout" in {
val containerManager = TestProbe()
val dataManagementService = TestProbe()
val jobManager =
containerManager.childActorOf(CreationJobManager.props(feedFactory, sid, dataManagementService.ref))
jobManager ! registerMessage
dataManagementService.expectMsg(RegisterData(testKey, "", failoverEnabled = false))
Thread.sleep(timeout.toMillis) // sleep 5s to wait for the timeout handler to be executed
dataManagementService.expectMsg(UnregisterData(testKey))
containerManager.expectMsg(
FailedCreationJob(
registerMessage.msg.creationId,
registerMessage.msg.invocationNamespace,
registerMessage.msg.action,
registerMessage.msg.revision,
ContainerCreationError.TimeoutError,
s"timeout waiting for the ack of ${registerMessage.msg.creationId} after $timeout"))
}
it should "increase the timeout if an action is a blackbox action" in {
val containerManager = TestProbe()
val dataManagementService = TestProbe()
val jobManager =
containerManager.childActorOf(CreationJobManager.props(feedFactory, sid, dataManagementService.ref))
val execMetadata =
BlackBoxExecMetaData(ImageName("test image"), Some("main"), native = false);
val actionMetaData = WhiskActionMetaData(
execAction.namespace,
execAction.name,
execMetadata,
execAction.parameters,
execAction.limits,
execAction.version,
execAction.publish,
execAction.annotations)
val message =
ContainerCreationMessage(
TransactionId.testing,
testInvocationNamespace,
action,
revision,
actionMetaData,
sid,
schedulerHost,
rpcPort,
creationId = creationIdTest)
val creationMsg = RegisterCreationJob(message)
jobManager ! creationMsg
dataManagementService.expectMsg(RegisterData(testKey, "", failoverEnabled = false))
// no message for timeout
dataManagementService.expectNoMessage(timeout)
Thread.sleep(timeout.toMillis * 2) // timeout is doubled for blackbox actions
dataManagementService.expectMsg(UnregisterData(testKey))
containerManager.expectMsg(
FailedCreationJob(
registerMessage.msg.creationId,
registerMessage.msg.invocationNamespace,
registerMessage.msg.action,
registerMessage.msg.revision,
ContainerCreationError.TimeoutError,
s"timeout waiting for the ack of ${registerMessage.msg.creationId} after $blackboxTimeout"))
}
it should "delete a creation job with too many retry and send a FailedCreationJob to a queue" in {
val containerManager = TestProbe()
val dataManagementService = TestProbe()
val probe = TestProbe()
val jobManager =
containerManager.childActorOf(CreationJobManager.props(feedFactory, sid, dataManagementService.ref))
QueuePool.put(
MemoryQueueKey(testInvocationNamespace, action.toDocId.asDocInfo(revision)),
MemoryQueueValue(probe.ref, true))
jobManager ! registerMessage
dataManagementService.expectMsg(RegisterData(testKey, "", failoverEnabled = false))
jobManager ! tooManyFinish
dataManagementService.expectMsg(UnregisterData(testKey))
containerManager.expectMsg(
FailedCreationJob(
registerMessage.msg.creationId,
registerMessage.msg.invocationNamespace,
registerMessage.msg.action,
registerMessage.msg.revision,
ContainerCreationError.UnknownError,
"unknown reason"))
probe.expectMsg(
FailedCreationJob(
registerMessage.msg.creationId,
registerMessage.msg.invocationNamespace,
registerMessage.msg.action,
registerMessage.msg.revision,
ContainerCreationError.UnknownError,
"unknown reason"))
}
}