blob: 05cddd83880247aa165c0ec3f23f53b7710859a4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.openwhisk.core.invoker
import java.nio.charset.StandardCharsets
import akka.actor.{ActorRef, ActorSystem, Props}
import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.openwhisk.common.{GracefulShutdown, Logging, TransactionId}
import org.apache.openwhisk.core.WarmUp.isWarmUpAction
import org.apache.openwhisk.core.WhiskConfig
import org.apache.openwhisk.core.connector.ContainerCreationError.DBFetchError
import org.apache.openwhisk.core.connector._
import org.apache.openwhisk.core.containerpool.v2.{CreationContainer, DeletionContainer}
import org.apache.openwhisk.core.database.{
ArtifactStore,
DocumentTypeMismatchException,
DocumentUnreadable,
NoDocumentException
}
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.http.Messages
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
class ContainerMessageConsumer(
invokerInstanceId: InvokerInstanceId,
containerPool: ActorRef,
entityStore: ArtifactStore[WhiskEntity],
config: WhiskConfig,
msgProvider: MessagingProvider,
longPollDuration: FiniteDuration,
maxPeek: Int,
sendAckToScheduler: (SchedulerInstanceId, ContainerCreationAckMessage) => Future[RecordMetadata])(
implicit actorSystem: ActorSystem,
executionContext: ExecutionContext,
logging: Logging) {
private val topic = s"${Invoker.topicPrefix}invoker${invokerInstanceId.toInt}"
private val consumer =
msgProvider.getConsumer(config, topic, topic, maxPeek, maxPollInterval = TimeLimit.MAX_DURATION + 1.minute)
private def handler(bytes: Array[Byte]): Future[Unit] = Future {
val raw = new String(bytes, StandardCharsets.UTF_8)
ContainerMessage.parse(raw) match {
case Success(creation: ContainerCreationMessage) if isWarmUpAction(creation.action) =>
logging.info(
this,
s"container creation message for ${creation.invocationNamespace}/${creation.action} is received (creationId: ${creation.creationId})")
feed ! MessageFeed.Processed
case Success(creation: ContainerCreationMessage) =>
implicit val transid: TransactionId = creation.transid
logging
.info(this, s"container creation message for ${creation.invocationNamespace}/${creation.action} is received")
WhiskAction
.get(entityStore, creation.action.toDocId, creation.revision, fromCache = true)
.map { action =>
containerPool ! CreationContainer(creation, action)
feed ! MessageFeed.Processed
}
.recover {
case t =>
val message = t match {
case _: NoDocumentException =>
Messages.actionRemovedWhileInvoking
case _: DocumentTypeMismatchException | _: DocumentUnreadable =>
Messages.actionMismatchWhileInvoking
case e: Throwable =>
logging.error(this, s"An unknown DB connection error occurred while fetching an action: $e.")
Messages.actionFetchErrorWhileInvoking
}
logging.error(
this,
s"failed to fetch action ${creation.invocationNamespace}/${creation.action}, error: $message (creationId: ${creation.creationId})")
val ack = ContainerCreationAckMessage(
creation.transid,
creation.creationId,
creation.invocationNamespace,
creation.action,
creation.revision,
creation.whiskActionMetaData,
invokerInstanceId,
creation.schedulerHost,
creation.rpcPort,
creation.retryCount,
Some(DBFetchError),
Some(message))
sendAckToScheduler(creation.rootSchedulerIndex, ack)
feed ! MessageFeed.Processed
}
case Success(deletion: ContainerDeletionMessage) =>
implicit val transid: TransactionId = deletion.transid
logging.info(this, s"deletion message for ${deletion.invocationNamespace}/${deletion.action} is received")
containerPool ! DeletionContainer(deletion)
feed ! MessageFeed.Processed
case Failure(t) =>
logging.error(this, s"Failed to parse $bytes, error: ${t.getMessage}")
feed ! MessageFeed.Processed
case _ =>
logging.error(this, s"Unexpected message received $raw")
feed ! MessageFeed.Processed
}
}
private val feed = actorSystem.actorOf(Props {
new MessageFeed("containerCreation", logging, consumer, maxPeek, longPollDuration, handler)
})
def close(): Unit = {
feed ! GracefulShutdown
}
}