blob: 89abe7bb946749c95d39d7d3b6c54e7af94a32f1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.openwhisk.core.invoker.test
import java.nio.charset.StandardCharsets
import akka.actor.ActorSystem
import akka.testkit.{TestKit, TestProbe}
import common.StreamLogging
import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.openwhisk.common.{Logging, TransactionId}
import org.apache.openwhisk.core.{WarmUp, WhiskConfig}
import org.apache.openwhisk.core.connector.ContainerCreationError._
import org.apache.openwhisk.core.connector._
import org.apache.openwhisk.core.connector.test.TestConnector
import org.apache.openwhisk.core.containerpool.v2.CreationContainer
import org.apache.openwhisk.core.database.test.DbUtils
import org.apache.openwhisk.core.entity.ExecManifest.{ImageName, RuntimeManifest}
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.core.entity.test.ExecHelpers
import org.apache.openwhisk.core.invoker.ContainerMessageConsumer
import org.apache.openwhisk.http.Messages
import org.apache.openwhisk.utils.{retry => utilRetry}
import org.junit.runner.RunWith
import org.scalamock.scalatest.MockFactory
import org.scalatest.junit.JUnitRunner
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpecLike, Matchers}
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.Try
@RunWith(classOf[JUnitRunner])
class ContainerMessageConsumerTests
extends TestKit(ActorSystem("ContainerMessageConsumer"))
with FlatSpecLike
with Matchers
with BeforeAndAfterEach
with BeforeAndAfterAll
with StreamLogging
with MockFactory
with DbUtils
with ExecHelpers {
implicit val actualActorSystem = system // Use system for duplicate system and actorSystem.
implicit val ec = actualActorSystem.dispatcher
implicit val transId = TransactionId.testing
implicit val creationId = CreationId.generate()
override def afterAll(): Unit = {
TestKit.shutdownActorSystem(system)
super.afterAll()
}
private val whiskConfig = new WhiskConfig(
Map(
WhiskConfig.actionInvokePerMinuteLimit -> null,
WhiskConfig.triggerFirePerMinuteLimit -> null,
WhiskConfig.actionInvokeConcurrentLimit -> null,
WhiskConfig.runtimesManifest -> null,
WhiskConfig.actionSequenceMaxLimit -> null))
private val entityStore = WhiskEntityStore.datastore()
private val producer = stub[MessageProducer]
private val defaultUserMemory: ByteSize = 1024.MB
private val invokerInstance = InvokerInstanceId(0, userMemory = defaultUserMemory)
private val schedulerInstanceId = SchedulerInstanceId("0")
private val invocationNamespace = EntityName("invocationSpace")
private val schedulerHost = "127.17.0.1"
private val rpcPort = 13001
override def afterEach(): Unit = {
cleanup()
}
private def fakeMessageProvider(consumer: TestConnector): MessagingProvider = {
new MessagingProvider {
override def getConsumer(
whiskConfig: WhiskConfig,
groupId: String,
topic: String,
maxPeek: Int,
maxPollInterval: FiniteDuration)(implicit logging: Logging, actorSystem: ActorSystem): MessageConsumer =
consumer
override def getProducer(config: WhiskConfig, maxRequestSize: Option[ByteSize])(
implicit logging: Logging,
actorSystem: ActorSystem): MessageProducer = consumer.getProducer()
override def ensureTopic(config: WhiskConfig,
topic: String,
topicConfig: String,
maxMessageBytes: Option[ByteSize])(implicit logging: Logging): Try[Unit] = Try {}
}
}
def sendAckToScheduler(producer: MessageProducer)(schedulerInstanceId: SchedulerInstanceId,
ackMessage: ContainerCreationAckMessage): Future[RecordMetadata] = {
val topic = s"creationAck${schedulerInstanceId.asString}"
producer.send(topic, ackMessage)
}
private def createAckMsg(creationMessage: ContainerCreationMessage,
error: Option[ContainerCreationError],
reason: Option[String]) = {
ContainerCreationAckMessage(
creationMessage.transid,
creationMessage.creationId,
creationMessage.invocationNamespace,
creationMessage.action,
creationMessage.revision,
creationMessage.whiskActionMetaData,
invokerInstance,
creationMessage.schedulerHost,
creationMessage.rpcPort,
creationMessage.retryCount,
error,
reason)
}
it should "forward ContainerCreationMessage to containerPool" in {
val pool = TestProbe()
val mockConsumer = new TestConnector("fakeTopic", 4, true)
val msgProvider = fakeMessageProvider(mockConsumer)
val consumer =
new ContainerMessageConsumer(
invokerInstance,
pool.ref,
entityStore,
whiskConfig,
msgProvider,
200.milliseconds,
500,
sendAckToScheduler(producer))
val exec = CodeExecAsString(RuntimeManifest("nodejs:10", ImageName("testImage")), "testCode", None)
val action =
WhiskAction(EntityPath("testns"), EntityName("testAction"), exec, limits = ActionLimits(TimeLimit(1.minute)))
put(entityStore, action)
val execMetadata =
CodeExecMetaDataAsString(exec.manifest, entryPoint = exec.entryPoint)
val actionMetadata =
WhiskActionMetaData(
action.namespace,
action.name,
execMetadata,
action.parameters,
action.limits,
action.version,
action.publish,
action.annotations)
val msg =
ContainerCreationMessage(
transId,
invocationNamespace.asString,
action.fullyQualifiedName(true),
DocRevision.empty,
actionMetadata,
schedulerInstanceId,
schedulerHost,
rpcPort,
creationId = creationId)
mockConsumer.send(msg)
pool.expectMsgPF() {
case CreationContainer(_, _) => true
}
}
it should "send ack(failed) to scheduler when failed to get action from DB " in {
val pool = TestProbe()
val creationConsumer = new TestConnector("creation", 4, true)
val msgProvider = fakeMessageProvider(creationConsumer)
val ackTopic = "ack"
val ackConsumer = new TestConnector(ackTopic, 4, true)
val consumer =
new ContainerMessageConsumer(
invokerInstance,
pool.ref,
entityStore,
whiskConfig,
msgProvider,
200.milliseconds,
500,
sendAckToScheduler(ackConsumer.getProducer()))
val exec = CodeExecAsString(RuntimeManifest("nodejs:10", ImageName("testImage")), "testCode", None)
val whiskAction =
WhiskAction(EntityPath("testns"), EntityName("testAction2"), exec, limits = ActionLimits(TimeLimit(1.minute)))
val execMetadata =
CodeExecMetaDataAsString(exec.manifest, entryPoint = exec.entryPoint)
val actionMetadata =
WhiskActionMetaData(
whiskAction.namespace,
whiskAction.name,
execMetadata,
whiskAction.parameters,
whiskAction.limits,
whiskAction.version,
whiskAction.publish,
whiskAction.annotations)
val creationMessage =
ContainerCreationMessage(
transId,
invocationNamespace.asString,
whiskAction.fullyQualifiedName(true),
DocRevision.empty,
actionMetadata,
schedulerInstanceId,
schedulerHost,
rpcPort,
creationId = creationId)
// action doesn't exist
val ackMessage = createAckMsg(creationMessage, Some(DBFetchError), Some(Messages.actionRemovedWhileInvoking))
creationConsumer.send(creationMessage)
within(5.seconds) {
utilRetry({
val buffer = ackConsumer.peek(50.millisecond)
buffer.size shouldBe 1
buffer.head._1 shouldBe ackTopic
new String(buffer.head._4, StandardCharsets.UTF_8) shouldBe ackMessage.serialize
}, 10, Some(500.millisecond))
pool.expectNoMessage(2.seconds)
}
// action exist but version mismatch
put(entityStore, whiskAction)
val actualCreationMessage = creationMessage.copy(revision = DocRevision("1-fake"))
val fetchErrorAckMessage =
createAckMsg(actualCreationMessage, Some(DBFetchError), Some(Messages.actionFetchErrorWhileInvoking))
creationConsumer.send(actualCreationMessage)
within(5.seconds) {
utilRetry({
val buffer2 = ackConsumer.peek(50.millisecond)
buffer2.size shouldBe 1
buffer2.head._1 shouldBe ackTopic
new String(buffer2.head._4, StandardCharsets.UTF_8) shouldBe fetchErrorAckMessage.serialize
}, 10, Some(500.millisecond))
pool.expectNoMessage(2.seconds)
}
}
it should "drop messages of warm-up action" in {
val pool = TestProbe()
val mockConsumer = new TestConnector("fakeTopic", 4, true)
val msgProvider = fakeMessageProvider(mockConsumer)
val consumer =
new ContainerMessageConsumer(
invokerInstance,
pool.ref,
entityStore,
whiskConfig,
msgProvider,
200.milliseconds,
500,
sendAckToScheduler(producer))
val exec = CodeExecAsString(RuntimeManifest("nodejs:10", ImageName("testImage")), "testCode", None)
val action =
WhiskAction(
WarmUp.warmUpAction.namespace.toPath,
WarmUp.warmUpAction.name,
exec,
limits = ActionLimits(TimeLimit(1.minute)))
val doc = put(entityStore, action)
val execMetadata =
CodeExecMetaDataAsString(exec.manifest, entryPoint = exec.entryPoint)
val actionMetadata =
WhiskActionMetaData(
action.namespace,
action.name,
execMetadata,
action.parameters,
action.limits,
action.version,
action.publish,
action.annotations)
val msg =
ContainerCreationMessage(
transId,
invocationNamespace.asString,
action.fullyQualifiedName(false),
DocRevision.empty,
actionMetadata,
schedulerInstanceId,
schedulerHost,
rpcPort,
creationId = creationId)
mockConsumer.send(msg)
pool.expectNoMessage(1.seconds)
}
}