blob: 3177cb76d8aca01f211bf7f324a46a6a8a77489a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.openwhisk.core.invoker
import java.nio.charset.StandardCharsets
import java.time.Instant
import akka.Done
import akka.actor.{ActorRefFactory, ActorSystem, CoordinatedShutdown, Props}
import akka.event.Logging.InfoLevel
import akka.http.scaladsl.server.Directives.complete
import akka.http.scaladsl.server.Route
import org.apache.openwhisk.common._
import org.apache.openwhisk.common.tracing.WhiskTracerProvider
import org.apache.openwhisk.core.ack.{MessagingActiveAck, UserEventSender}
import org.apache.openwhisk.core.connector._
import org.apache.openwhisk.core.containerpool._
import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider
import org.apache.openwhisk.core.database.{UserContext, _}
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
import org.apache.openwhisk.http.Messages
import org.apache.openwhisk.spi.SpiLoader
import pureconfig._
import pureconfig.generic.auto._
import spray.json._
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
object InvokerReactive extends InvokerProvider {
override def instance(
config: WhiskConfig,
instance: InvokerInstanceId,
producer: MessageProducer,
poolConfig: ContainerPoolConfig,
limitsConfig: ConcurrencyLimitConfig)(implicit actorSystem: ActorSystem, logging: Logging): InvokerCore =
new InvokerReactive(config, instance, producer, poolConfig, limitsConfig)
}
class InvokerReactive(
config: WhiskConfig,
instance: InvokerInstanceId,
producer: MessageProducer,
poolConfig: ContainerPoolConfig = loadConfigOrThrow[ContainerPoolConfig](ConfigKeys.containerPool),
limitsConfig: ConcurrencyLimitConfig = loadConfigOrThrow[ConcurrencyLimitConfig](ConfigKeys.concurrencyLimit))(
implicit actorSystem: ActorSystem,
logging: Logging)
extends InvokerCore {
implicit val ec: ExecutionContext = actorSystem.dispatcher
implicit val cfg: WhiskConfig = config
private val logsProvider = SpiLoader.get[LogStoreProvider].instance(actorSystem)
logging.info(this, s"LogStoreProvider: ${logsProvider.getClass}")
/**
* Factory used by the ContainerProxy to physically create a new container.
*
* Create and initialize the container factory before kicking off any other
* task or actor because further operation does not make sense if something
* goes wrong here. Initialization will throw an exception upon failure.
*/
private val containerFactory =
SpiLoader
.get[ContainerFactoryProvider]
.instance(
actorSystem,
logging,
config,
instance,
Map(
"--cap-drop" -> Set("NET_RAW", "NET_ADMIN"),
"--ulimit" -> Set("nofile=1024:1024"),
"--pids-limit" -> Set("1024")) ++ logsProvider.containerParameters)
containerFactory.init()
CoordinatedShutdown(actorSystem)
.addTask(CoordinatedShutdown.PhaseBeforeActorSystemTerminate, "cleanup runtime containers") { () =>
containerFactory.cleanup()
Future.successful(Done)
}
/** Initialize needed databases */
private val entityStore = WhiskEntityStore.datastore()
private val activationStore =
SpiLoader.get[ActivationStoreProvider].instance(actorSystem, logging)
private val authStore = WhiskAuthStore.datastore()
private val namespaceBlacklist = new NamespaceBlacklist(authStore)
Scheduler.scheduleWaitAtMost(loadConfigOrThrow[NamespaceBlacklistConfig](ConfigKeys.blacklist).pollInterval) { () =>
logging.debug(this, "running background job to update blacklist")
namespaceBlacklist.refreshBlacklist()(ec, TransactionId.invoker).andThen {
case Success(set) => logging.info(this, s"updated blacklist to ${set.size} entries")
case Failure(t) => logging.error(this, s"error on updating the blacklist: ${t.getMessage}")
}
}
/** Initialize message consumers */
private val topic = s"${Invoker.topicPrefix}invoker${instance.toInt}"
private val maximumContainers = (poolConfig.userMemory / MemoryLimit.MIN_MEMORY).toInt
private val msgProvider = SpiLoader.get[MessagingProvider]
//number of peeked messages - increasing the concurrentPeekFactor improves concurrent usage, but adds risk for message loss in case of crash
private val maxPeek =
math.max(maximumContainers, (maximumContainers * limitsConfig.max * poolConfig.concurrentPeekFactor).toInt)
private val consumer =
msgProvider.getConsumer(config, topic, topic, maxPeek, maxPollInterval = TimeLimit.MAX_DURATION + 1.minute)
private val activationFeed = actorSystem.actorOf(Props {
new MessageFeed("activation", logging, consumer, maxPeek, 1.second, processActivationMessage)
})
private val ack = {
val sender = if (UserEvents.enabled) Some(new UserEventSender(producer)) else None
new MessagingActiveAck(producer, instance, sender)
}
private val collectLogs = new LogStoreCollector(logsProvider)
/** Stores an activation in the database. */
private val store = (tid: TransactionId, activation: WhiskActivation, isBlocking: Boolean, context: UserContext) => {
implicit val transid: TransactionId = tid
activationStore.storeAfterCheck(activation, isBlocking, None, context)(tid, notifier = None, logging)
}
/** Creates a ContainerProxy Actor when being called. */
private val childFactory = (f: ActorRefFactory) =>
f.actorOf(
ContainerProxy
.props(containerFactory.createContainer, ack, store, collectLogs, instance, poolConfig))
val prewarmingConfigs: List[PrewarmingConfig] = {
ExecManifest.runtimesManifest.stemcells.flatMap {
case (mf, cells) =>
cells.map { cell =>
PrewarmingConfig(cell.initialCount, new CodeExecAsString(mf, "", None), cell.memory, cell.reactive)
}
}.toList
}
private val pool =
actorSystem.actorOf(ContainerPool.props(childFactory, poolConfig, activationFeed, prewarmingConfigs))
def handleActivationMessage(msg: ActivationMessage)(implicit transid: TransactionId): Future[Unit] = {
val namespace = msg.action.path
val name = msg.action.name
val actionid = FullyQualifiedEntityName(namespace, name).toDocId.asDocInfo(msg.revision)
val subject = msg.user.subject
logging.debug(this, s"${actionid.id} $subject ${msg.activationId}")
// caching is enabled since actions have revision id and an updated
// action will not hit in the cache due to change in the revision id;
// if the doc revision is missing, then bypass cache
if (actionid.rev == DocRevision.empty) logging.warn(this, s"revision was not provided for ${actionid.id}")
WhiskAction
.get(entityStore, actionid.id, actionid.rev, fromCache = actionid.rev != DocRevision.empty)
.flatMap(action => {
action.toExecutableWhiskAction match {
case Some(executable) =>
pool ! Run(executable, msg)
Future.successful(())
case None =>
logging.error(this, s"non-executable action reached the invoker ${action.fullyQualifiedName(false)}")
Future.failed(new IllegalStateException("non-executable action reached the invoker"))
}
})
.recoverWith {
case DocumentRevisionMismatchException(_) =>
// if revision is mismatched, the action may have been updated,
// so try again with the latest code
handleActivationMessage(msg.copy(revision = DocRevision.empty))
case t =>
val response = t match {
case _: NoDocumentException =>
ActivationResponse.applicationError(Messages.actionRemovedWhileInvoking)
case _: DocumentTypeMismatchException | _: DocumentUnreadable =>
ActivationResponse.whiskError(Messages.actionMismatchWhileInvoking)
case _ =>
ActivationResponse.whiskError(Messages.actionFetchErrorWhileInvoking)
}
activationFeed ! MessageFeed.Processed
val activation = generateFallbackActivation(msg, response)
ack(
msg.transid,
activation,
msg.blocking,
msg.rootControllerIndex,
msg.user.namespace.uuid,
CombinedCompletionAndResultMessage(transid, activation, instance))
store(msg.transid, activation, msg.blocking, UserContext(msg.user))
Future.successful(())
}
}
/** Is called when an ActivationMessage is read from Kafka */
def processActivationMessage(bytes: Array[Byte]): Future[Unit] = {
Future(ActivationMessage.parse(new String(bytes, StandardCharsets.UTF_8)))
.flatMap(Future.fromTry)
.flatMap { msg =>
// The message has been parsed correctly, thus the following code needs to *always* produce at least an
// active-ack.
implicit val transid: TransactionId = msg.transid
//set trace context to continue tracing
WhiskTracerProvider.tracer.setTraceContext(transid, msg.traceContext)
if (!namespaceBlacklist.isBlacklisted(msg.user)) {
val start = transid.started(this, LoggingMarkers.INVOKER_ACTIVATION, logLevel = InfoLevel)
handleActivationMessage(msg)
} else {
// Iff the current namespace is blacklisted, an active-ack is only produced to keep the loadbalancer protocol
// Due to the protective nature of the blacklist, a database entry is not written.
activationFeed ! MessageFeed.Processed
val activation =
generateFallbackActivation(msg, ActivationResponse.applicationError(Messages.namespacesBlacklisted))
ack(
msg.transid,
activation,
false,
msg.rootControllerIndex,
msg.user.namespace.uuid,
CombinedCompletionAndResultMessage(transid, activation, instance))
logging.warn(this, s"namespace ${msg.user.namespace.name} was blocked in invoker.")
Future.successful(())
}
}
.recoverWith {
case t =>
// Iff everything above failed, we have a terminal error at hand. Either the message failed
// to deserialize, or something threw an error where it is not expected to throw.
activationFeed ! MessageFeed.Processed
logging.error(this, s"terminal failure while processing message: $t")
Future.successful(())
}
}
/**
* Generates an activation with zero runtime. Usually used for error cases.
*
* Set the kind annotation to `Exec.UNKNOWN` since it is not known to the invoker because the action fetch failed.
*/
private def generateFallbackActivation(msg: ActivationMessage, response: ActivationResponse): WhiskActivation = {
val now = Instant.now
val causedBy = if (msg.causedBySequence) {
Some(Parameters(WhiskActivation.causedByAnnotation, JsString(Exec.SEQUENCE)))
} else None
WhiskActivation(
activationId = msg.activationId,
namespace = msg.user.namespace.name.toPath,
subject = msg.user.subject,
cause = msg.cause,
name = msg.action.name,
version = msg.action.version.getOrElse(SemVer()),
start = now,
end = now,
duration = Some(0),
response = response,
annotations = {
Parameters(WhiskActivation.pathAnnotation, JsString(msg.action.copy(version = None).asString)) ++
Parameters(WhiskActivation.kindAnnotation, JsString(Exec.UNKNOWN)) ++ causedBy
})
}
private val healthProducer = msgProvider.getProducer(config)
Scheduler.scheduleWaitAtMost(1.seconds)(() => {
healthProducer.send(s"${Invoker.topicPrefix}health", PingMessage(instance)).andThen {
case Failure(t) => logging.error(this, s"failed to ping the controller: $t")
}
})
override def enable(): Route = {
complete("not supported")
}
override def disable(): Route = {
complete("not supported")
}
}