| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.openwhisk.core.scheduler.queue |
| |
| import akka.actor.Status.{Failure => FailureMessage} |
| import akka.actor.{ActorRef, ActorSystem, Cancellable, FSM, Props, Stash} |
| import akka.util.Timeout |
| import org.apache.openwhisk.common._ |
| import org.apache.openwhisk.common.time.{Clock, SystemClock} |
| import org.apache.openwhisk.core.ConfigKeys |
| import org.apache.openwhisk.core.ack.ActiveAck |
| import org.apache.openwhisk.core.connector.ContainerCreationError.ZeroNamespaceLimit |
| import org.apache.openwhisk.core.connector._ |
| import org.apache.openwhisk.core.containerpool.Interval |
| import org.apache.openwhisk.core.database.{NoDocumentException, UserContext} |
| import org.apache.openwhisk.core.entity._ |
| import org.apache.openwhisk.core.entity.size._ |
| import org.apache.openwhisk.core.etcd.EtcdClient |
| import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.containerPrefix |
| import org.apache.openwhisk.core.etcd.EtcdKV.{ContainerKeys, QueueKeys, ThrottlingKeys} |
| import org.apache.openwhisk.core.scheduler.grpc.{GetActivation, ActivationResponse => GetActivationResponse} |
| import org.apache.openwhisk.core.scheduler.message.{ |
| ContainerCreation, |
| ContainerDeletion, |
| FailedCreationJob, |
| SuccessfulCreationJob |
| } |
| import org.apache.openwhisk.core.scheduler.{SchedulerEndpoints, SchedulingConfig} |
| import org.apache.openwhisk.core.service._ |
| import org.apache.openwhisk.http.Messages.{namespaceLimitUnderZero, tooManyConcurrentRequests} |
| import pureconfig.loadConfigOrThrow |
| import spray.json._ |
| import pureconfig.generic.auto._ |
| |
| import java.time.{Duration, Instant} |
| import java.util.concurrent.atomic.AtomicInteger |
| import scala.annotation.tailrec |
| import scala.collection.immutable.Queue |
| import scala.collection.mutable |
| import scala.concurrent.duration._ |
| import scala.concurrent.{duration, ExecutionContextExecutor, Future, Promise} |
| import scala.util.{Failure, Success} |
| |
| // States |
| sealed trait MemoryQueueState |
| case object Uninitialized extends MemoryQueueState |
| case object Running extends MemoryQueueState |
| case object Idle extends MemoryQueueState |
| case object Flushing extends MemoryQueueState |
| case object Removing extends MemoryQueueState |
| case object Removed extends MemoryQueueState |
| case object ActionThrottled extends MemoryQueueState |
| case object NamespaceThrottled extends MemoryQueueState |
| |
| // Data |
| sealed abstract class MemoryQueueData() |
| case class NoData() extends MemoryQueueData() |
| case class NoActors() extends MemoryQueueData() |
| case class RunningData(schedulerActor: ActorRef, droppingActor: ActorRef) extends MemoryQueueData() |
| case class ThrottledData(schedulerActor: ActorRef, droppingActor: ActorRef) extends MemoryQueueData() |
| case class FlushingData(schedulerActor: ActorRef, |
| droppingActor: ActorRef, |
| error: ContainerCreationError, |
| reason: String, |
| activeDuringFlush: Boolean = false) |
| extends MemoryQueueData() |
| case class RemovingData(schedulerActor: ActorRef, droppingActor: ActorRef, outdated: Boolean) extends MemoryQueueData() |
| |
| // Events sent by the actor |
| case class QueueRemoved(invocationNamespace: String, action: DocInfo, leaderKey: Option[String]) |
| case class QueueReactivated(invocationNamespace: String, action: FullyQualifiedEntityName, docInfo: DocInfo) |
| case class CancelPoll(promise: Promise[Either[MemoryQueueError, ActivationMessage]]) |
| case object QueueRemovedCompleted |
| |
| // Events received by the actor |
| case object Start |
| case object VersionUpdated |
| case object StopSchedulingAsOutdated |
| |
| sealed trait RequiredAction |
| case object Skip extends RequiredAction |
| case object AddInitialContainer extends RequiredAction |
| case object AddContainer extends RequiredAction |
| case class EnableNamespaceThrottling(dropMsg: Boolean) extends RequiredAction |
| case object DisableNamespaceThrottling extends RequiredAction |
| case object EnableActionThrottling extends RequiredAction |
| case object DisableActionThrottling extends RequiredAction |
| case object Pausing extends RequiredAction |
| case class DecisionResults(required: RequiredAction, num: Int) |
| |
| case class TimeSeriesActivationEntry(timestamp: Instant, msg: ActivationMessage) |
| |
| class MemoryQueue(private val etcdClient: EtcdClient, |
| private val durationChecker: DurationChecker, |
| private val action: FullyQualifiedEntityName, |
| messagingProducer: MessageProducer, |
| schedulingConfig: SchedulingConfig, |
| invocationNamespace: String, |
| revision: DocRevision, |
| endpoints: SchedulerEndpoints, |
| actionMetaData: WhiskActionMetaData, |
| dataManagementService: ActorRef, |
| watcherService: ActorRef, |
| containerManager: ActorRef, |
| decisionMaker: ActorRef, |
| schedulerId: SchedulerInstanceId, |
| ack: ActiveAck, |
| store: (TransactionId, WhiskActivation, UserContext) => Future[Any], |
| getUserLimit: String => Future[Int], |
| checkToDropStaleActivation: (Clock, |
| Queue[TimeSeriesActivationEntry], |
| Long, |
| String, |
| WhiskActionMetaData, |
| MemoryQueueState, |
| ActorRef) => Unit, |
| queueConfig: QueueConfig)(implicit logging: Logging, clock: Clock) |
| extends FSM[MemoryQueueState, MemoryQueueData] |
| with Stash { |
| |
| private implicit val ec: ExecutionContextExecutor = context.dispatcher |
| private implicit val actorSystem: ActorSystem = context.system |
| private implicit val timeout = Timeout(5.seconds) |
| private implicit val order: Ordering[BufferedRequest] = Ordering.by(_.containerId) |
| |
| private val StaleDuration = Duration.ofMillis(schedulingConfig.staleThreshold.toMillis) |
| private val unversionedAction = action.copy(version = None) |
| private val leaderKey = QueueKeys.queue(invocationNamespace, unversionedAction, leader = true) |
| private val inProgressContainerPrefixKey = |
| containerPrefix(ContainerKeys.inProgressPrefix, invocationNamespace, action, Some(revision)) |
| private val existingContainerPrefixKey = |
| containerPrefix(ContainerKeys.namespacePrefix, invocationNamespace, action, Some(revision)) |
| private val namespaceThrottlingKey = ThrottlingKeys.namespace(EntityName(invocationNamespace)) |
| private val actionThrottlingKey = ThrottlingKeys.action(invocationNamespace, unversionedAction) |
| private val pollTimeOut = 1.seconds |
| private var requestBuffer = mutable.PriorityQueue.empty[BufferedRequest] |
| private val memory = actionMetaData.limits.memory.megabytes.MB |
| private val queueRemovedMsg = QueueRemoved(invocationNamespace, action.toDocId.asDocInfo(revision), Some(leaderKey)) |
| private val staleQueueRemovedMsg = QueueRemoved(invocationNamespace, action.toDocId.asDocInfo(revision), None) |
| private val actionRetentionTimeout = MemoryQueue.getRetentionTimeout(actionMetaData, queueConfig) |
| |
| private[queue] var containers = Set.empty[String] |
| private[queue] var creationIds = Set.empty[String] |
| |
| private[queue] var queue = Queue.empty[TimeSeriesActivationEntry] |
| private[queue] var in = new AtomicInteger(0) |
| private[queue] val namespaceContainerCount = NamespaceContainerCount(invocationNamespace, etcdClient, watcherService) |
| private[queue] var averageDuration: Option[Double] = None |
| private[queue] var averageDurationBuffer = AverageRingBuffer(queueConfig.durationBufferSize) |
| private[queue] var limit: Option[Int] = None |
| private[queue] var initialized = false |
| |
| private val logScheduler: Cancellable = context.system.scheduler.scheduleWithFixedDelay(0.seconds, 1.seconds) { () => |
| MetricEmitter.emitGaugeMetric( |
| LoggingMarkers.SCHEDULER_QUEUE_WAITING_ACTIVATION(s"$invocationNamespace/$action"), |
| queue.size) |
| |
| MetricEmitter.emitGaugeMetric( |
| LoggingMarkers.SCHEDULER_NAMESPACE_CONTAINER(invocationNamespace), |
| namespaceContainerCount.existingContainerNumByNamespace) |
| MetricEmitter.emitGaugeMetric( |
| LoggingMarkers.SCHEDULER_NAMESPACE_INPROGRESS_CONTAINER(invocationNamespace), |
| namespaceContainerCount.inProgressContainerNumByNamespace) |
| |
| MetricEmitter.emitGaugeMetric( |
| LoggingMarkers.SCHEDULER_ACTION_CONTAINER(invocationNamespace, action.asString), |
| containers.size) |
| MetricEmitter.emitGaugeMetric( |
| LoggingMarkers.SCHEDULER_ACTION_INPROGRESS_CONTAINER(invocationNamespace, action.asString), |
| creationIds.size) |
| } |
| |
| getAverageDuration() |
| |
| private val watcherName = s"memory-queue-$action-$revision" |
| // watch existing containers for action and namespace |
| private val watchedKeys = Seq(inProgressContainerPrefixKey, existingContainerPrefixKey) |
| |
| watchedKeys.foreach { key => |
| watcherService ! WatchEndpoint(key, "", isPrefix = true, watcherName, Set(PutEvent, DeleteEvent)) |
| } |
| |
| startWith(Uninitialized, NoData()) |
| |
| when(Uninitialized) { |
| case Event(Start, _) => |
| logging.info( |
| this, |
| s"[$invocationNamespace:$action:$stateName] a new queue is created, retentionTimeout: $actionRetentionTimeout, kind: ${actionMetaData.exec.kind}.") |
| val (schedulerActor, droppingActor) = startMonitoring() |
| initializeThrottling() |
| |
| watcherService ! WatchEndpoint(leaderKey, endpoints.serialize, isPrefix = false, watcherName, Set(DeleteEvent)) |
| |
| goto(Running) using RunningData(schedulerActor, droppingActor) |
| |
| // this is the case that the action version is updated, so no data needs to be stored |
| case Event(VersionUpdated, _) => |
| val (schedulerActor, droppingActor) = startMonitoring() |
| |
| goto(Running) using RunningData(schedulerActor, droppingActor) |
| |
| // other messages should not be handled in this state. |
| case _ => |
| stash() |
| stay |
| } |
| |
| when(Running, stateTimeout = queueConfig.idleGrace) { |
| case Event(EnableNamespaceThrottling(dropMsg), data: RunningData) => |
| logging.info(this, s"[$invocationNamespace:$action:$stateName] Enable namespace throttling.") |
| enableNamespaceThrottling() |
| |
| if (dropMsg) |
| completeAllActivations(tooManyConcurrentRequests, isWhiskError = false) |
| goto(NamespaceThrottled) using ThrottledData(data.schedulerActor, data.droppingActor) |
| |
| case Event(StateTimeout, data: RunningData) => |
| if (queue.isEmpty && (containers.size + creationIds.size) <= 0) { |
| logging.info( |
| this, |
| s"[$invocationNamespace:$action:$stateName] No activations coming in ${queueConfig.idleGrace}") |
| actorSystem.stop(data.schedulerActor) |
| actorSystem.stop(data.droppingActor) |
| |
| goto(Idle) using NoActors() |
| } else { |
| logging.info( |
| this, |
| s"[$invocationNamespace:$action:$stateName] The queue is timed out but there are still ${queue.size} activation messages or (running: ${containers.size}, in-progress: ${creationIds.size}) containers") |
| stay |
| } |
| |
| case Event(FailedCreationJob(creationId, _, _, _, error, message), RunningData(schedulerActor, droppingActor)) => |
| creationIds -= creationId.asString |
| // when there is no container, it moves to the Flushing state as no activations can be invoked |
| if (containers.size <= 0) { |
| val isWhiskError = ContainerCreationError.whiskErrors.contains(error) |
| if (!isWhiskError) { |
| completeAllActivations(message, isWhiskError) |
| } |
| logging.error( |
| this, |
| s"[$invocationNamespace:$action:$stateName] Failed to create an initial container due to ${if (isWhiskError) "whiskError" |
| else "developerError"}, reason: $message.") |
| |
| goto(Flushing) using FlushingData(schedulerActor, droppingActor, error, message) |
| } else |
| // if there are already some containers running, activations can be handled anyway. |
| stay |
| } |
| |
| // there is no timeout for this state as when there is no further message, it would move to the Running state again. |
| when(NamespaceThrottled) { |
| case Event(msg: ActivationMessage, _: ThrottledData) => |
| if (containers.size + creationIds.size == 0) { |
| completeErrorActivation(msg, tooManyConcurrentRequests, isWhiskError = false) |
| } else { |
| handleActivationMessage(msg) |
| } |
| stay |
| |
| case Event(DisableNamespaceThrottling, data: ThrottledData) => |
| logging.info(this, s"[$invocationNamespace:$action:$stateName] Disable namespace throttling.") |
| disableNamespaceThrottling() |
| goto(Running) using RunningData(data.schedulerActor, data.schedulerActor) |
| } |
| |
| // there is no timeout for this state as when there is no further message, it would move to the Running state again. |
| when(ActionThrottled) { |
| // since there are already too many activation messages, it drops the new messages |
| case Event(msg: ActivationMessage, ThrottledData(_, _)) => |
| completeErrorActivation(msg, tooManyConcurrentRequests, isWhiskError = false) |
| stay |
| } |
| |
| when(Idle, stateTimeout = queueConfig.stopGrace) { |
| case Event(msg: ActivationMessage, _: NoActors) => |
| val (schedulerActor, droppingActor) = startMonitoring() |
| handleActivationMessage(msg) |
| goto(Running) using RunningData(schedulerActor, droppingActor) |
| |
| case Event(request: GetActivation, _) if request.action == action => |
| sender ! GetActivationResponse(Left(NoActivationMessage())) |
| stay |
| |
| case Event(StateTimeout, _: NoActors) => |
| logging.info(this, s"[$invocationNamespace:$action:$stateName] The queue is timed out, stop the queue.") |
| cleanUpDataAndGotoRemoved() |
| |
| case Event(GracefulShutdown, _: NoActors) => |
| logging.info(this, s"[$invocationNamespace:$action:$stateName] Received GracefulShutdown, stop the queue.") |
| cleanUpDataAndGotoRemoved() |
| |
| case Event(StopSchedulingAsOutdated, _: NoActors) => |
| logging.info(this, s"[$invocationNamespace:$action:$stateName] stop further scheduling.") |
| |
| cleanUpWatcher() |
| |
| // let QueueManager know this queue is no longer in charge. |
| context.parent ! staleQueueRemovedMsg |
| |
| // since the queue is outdated and there is no activation, delete all old containers. |
| containerManager ! ContainerDeletion(invocationNamespace, action, revision, actionMetaData) |
| |
| goto(Removed) using NoData() |
| } |
| |
| when(Flushing) { |
| // an initial container is successfully created. |
| case Event(SuccessfulCreationJob(creationId, _, _, _), FlushingData(schedulerActor, droppingActor, _, _, _)) => |
| creationIds -= creationId.asString |
| |
| goto(Running) using RunningData(schedulerActor, droppingActor) |
| |
| // log the failed information |
| case Event(FailedCreationJob(creationId, _, _, _, error, message), data: FlushingData) => |
| creationIds -= creationId.asString |
| logging.info( |
| this, |
| s"[$invocationNamespace:$action:$stateName][$creationId] Failed to create a container due to $message") |
| |
| // keep updating the reason |
| stay using data.copy(error = error, reason = message) |
| |
| // since there is no container, activations cannot be handled. |
| case Event(msg: ActivationMessage, data: FlushingData) => |
| logging.info(this, s"[$invocationNamespace:$action:$stateName] got a new activation message ${msg.activationId}")( |
| msg.transid) |
| val whiskError = isWhiskError(data.error) |
| if (whiskError) |
| queue = queue.enqueue(TimeSeriesActivationEntry(clock.now(), msg)) |
| else |
| completeErrorActivation(msg, data.reason, whiskError) |
| stay() using data.copy(activeDuringFlush = true) |
| |
| // Since SchedulingDecisionMaker keep sending a message to create a container, this state is not automatically timed out. |
| // Instead, StateTimeout message will be sent by a timer. |
| case Event(StateTimeout | DropOld, data: FlushingData) => |
| logging.info(this, s"[$invocationNamespace:$action:$stateName] Received StateTimeout, drop stale messages.") |
| queue = MemoryQueue.dropOld( |
| clock, |
| queue, |
| Duration.ofMillis(actionRetentionTimeout), |
| data.reason, |
| completeErrorActivation) |
| if (data.activeDuringFlush || queue.nonEmpty) |
| stay using data.copy(activeDuringFlush = false) |
| else |
| cleanUpActorsAndGotoRemoved(data) |
| |
| case Event(GracefulShutdown, data: FlushingData) => |
| completeAllActivations(data.reason, isWhiskError(data.error)) |
| logging.info(this, s"[$invocationNamespace:$action:$stateName] Received GracefulShutdown, stop the queue.") |
| cleanUpActorsAndGotoRemoved(data) |
| |
| case Event(StopSchedulingAsOutdated, data: FlushingData) => |
| logging.info(this, s"[$invocationNamespace:$action:$stateName] stop further scheduling.") |
| completeAllActivations(data.reason, isWhiskError(data.error)) |
| // let QueueManager know this queue is no longer in charge. |
| context.parent ! staleQueueRemovedMsg |
| cleanUpActors(data) |
| cleanUpData() |
| |
| goto(Removed) using NoData() |
| } |
| |
| // in case there is any activation in the queue, it waits until all of them are handled. |
| when(Removing, stateTimeout = queueConfig.gracefulShutdownTimeout) { |
| // When there is no message in the queue, SchedulingDecisionMaker would stop sending any message |
| // So the queue can be timed out on every gracefulShutdownTimeout |
| case Event(QueueRemovedCompleted | StateTimeout, data: RemovingData) => |
| cleanUpActorsAndGotoRemovedIfPossible(data) |
| |
| case Event(GracefulShutdown, data: RemovingData) => |
| logging.info( |
| this, |
| s"[$invocationNamespace:$action:$stateName] The queue received GracefulShutdown trying to stop the queue.") |
| cleanUpActorsAndGotoRemovedIfPossible(data) |
| |
| case Event(StopSchedulingAsOutdated, data: RemovingData) => |
| logging.info( |
| this, |
| s"[$invocationNamespace:$action:$stateName] The queue received StopSchedulingAsOutdated trying to stop the queue.") |
| |
| handleStaleActivationsWhenActionUpdated(context.parent) |
| |
| cleanUpActorsAndGotoRemovedIfPossible(data.copy(outdated = true)) |
| } |
| |
| when(Removed, stateTimeout = queueConfig.gracefulShutdownTimeout) { |
| // since this Queue will be terminated, rescheduling the msg |
| case Event(msg: ActivationMessage, _: NoData) => |
| context.parent ! msg |
| stay() |
| |
| // this queue is going to stop so let client connect to a new queue |
| case Event(request: GetActivation, _: NoData) if request.action == action => |
| implicit val tid = request.transactionId |
| logging.info( |
| this, |
| s"[$invocationNamespace:$action:$stateName] Get activation request ${request.containerId}, let client connect to a new queue.") |
| forwardAllActivations(context.parent) |
| sender ! GetActivationResponse(Left(NoMemoryQueue())) |
| |
| stay |
| |
| // actors and data are already wiped |
| case Event(QueueRemovedCompleted, _: NoData) => |
| logging.info(this, "stop fsm") |
| stop() |
| |
| // This is not supposed to happen. This will ensure the queue does not run forever. |
| // This can happen when QueueManager could not respond with QueueRemovedCompleted for some reason. |
| case Event(StateTimeout, _: NoData) => |
| context.parent ! queueRemovedMsg |
| |
| stop() |
| |
| // This queue is going to stop, do nothing |
| case Event(msg @ (StopSchedulingAsOutdated | GracefulShutdown), _: NoData) => |
| logging.info( |
| this, |
| s"[$invocationNamespace:$action:$stateName] The queue received $msg but do nothing as it is going to stop.") |
| stay |
| } |
| |
| whenUnhandled { |
| // The queue endpoint is removed, trying to restore it. |
| case Event(WatchEndpointRemoved(_, `leaderKey`, value, false), data) => |
| data match { |
| case RemovingData(_, _, _) => |
| logging.info( |
| this, |
| s"[$invocationNamespace:$action:$stateName] This queue is shutdown by `/disable` api, do nothing here.") |
| case _ => |
| dataManagementService ! RegisterInitialData(leaderKey, value, failoverEnabled = false, Some(self)) // the watcher is already setup |
| } |
| stay |
| |
| // we don't care the storage results for namespaceThrottlingKey |
| case Event(InitialDataStorageResults(`namespaceThrottlingKey`, _), _) => |
| stay |
| |
| // The queue endpoint is restored |
| case Event(InitialDataStorageResults(`leaderKey`, Right(_)), _) => |
| stay |
| |
| // this can be a case that there is another queue already running. |
| // it can happen if a node is segregated by the temporal network rupture and the queue endpoint is removed. |
| case Event(InitialDataStorageResults(`leaderKey`, Left(_)), data) => |
| logging.warn(this, s"[$invocationNamespace:$action:$stateName] the queue is superseded by a new queue.") |
| // let QueueManager know this queue is no longer in charge. |
| context.parent ! queueRemovedMsg |
| |
| // forward all activations to the parent queue manager. |
| // parent queue manager is supposed to removed the reference of this queue and forward messages to a new queue |
| forwardAllActivations(context.parent) |
| |
| // only clean up actors because etcd data is already being used by another queue |
| cleanUpActors(data) |
| |
| goto(Removed) using NoData() |
| |
| case Event(WatchEndpointRemoved(watchKey, key, _, true), _) => |
| watchKey match { |
| case `inProgressContainerPrefixKey` => |
| creationIds -= key.split("/").last |
| case `existingContainerPrefixKey` => |
| val containerId = key.split("/").last |
| removeDeletedContainerFromRequestBuffer(containerId) |
| containers -= containerId |
| case _ => |
| } |
| stay |
| |
| case Event(WatchEndpointInserted(watchKey, key, _, true), _) => |
| watchKey match { |
| case `inProgressContainerPrefixKey` => |
| creationIds += key.split("/").last |
| case `existingContainerPrefixKey` => |
| containers += key.split("/").last |
| case _ => |
| } |
| stay |
| |
| // common case for Running, NamespaceThrottled, ActionThrottled |
| case Event(SuccessfulCreationJob(creationId, _, _, _), _) => |
| creationIds -= creationId.asString |
| stay() |
| |
| // for other cases |
| case Event(FailedCreationJob(creationId, invocationNamespace, action, revision, _, message), _) => |
| creationIds -= creationId.asString |
| logging.info( |
| this, |
| s"[$invocationNamespace:$action:$stateName][$creationId] Got failed creation job with revision $revision and error $message.") |
| stay() |
| |
| // common case for Running, NamespaceThrottled, ActionThrottled, Removing |
| case Event(cancel: CancelPoll, _) => |
| cancel.promise.trySuccess(Left(NoActivationMessage())) |
| |
| stay |
| |
| // common case for Running, NamespaceThrottled, ActionThrottled, Removing |
| case Event(msg: ActivationMessage, _) => |
| handleActivationMessage(msg) |
| |
| // common case for Running, NamespaceThrottled, ActionThrottled, Removing |
| case Event(request: GetActivation, _) if request.action == action => |
| implicit val tid = request.transactionId |
| if (request.alive) { |
| containers += request.containerId |
| handleActivationRequest(request) |
| } else { |
| logging.info(this, s"Remove containerId because ${request.containerId} is not alive") |
| removeDeletedContainerFromRequestBuffer(request.containerId) |
| sender ! GetActivationResponse(Left(NoActivationMessage())) |
| containers -= request.containerId |
| stay |
| } |
| |
| // common case for Running, NamespaceThrottled, ActionThrottled, Removing |
| case Event(request: GetActivation, _) if request.action != action => |
| implicit val tid = request.transactionId |
| logging.warn(this, s"[$invocationNamespace:$action:$stateName] version mismatch ${request.action}") |
| sender ! GetActivationResponse(Left(ActionMismatch())) |
| |
| stay |
| |
| case Event(DropOld, _) => |
| if (queue.nonEmpty && Duration |
| .between(queue.head.timestamp, clock.now()) |
| .compareTo(Duration.ofMillis(actionRetentionTimeout)) < 0) { |
| logging.error( |
| this, |
| s"[$invocationNamespace:$action:$stateName] Drop some stale activations for $revision, existing container is ${containers.size}, inProgress container is ${creationIds.size}, state data: $stateData, in is $in, current: ${queue.size}.") |
| logging.error( |
| this, |
| s"[$invocationNamespace:$action:$stateName] the head stale message: ${queue.head.msg.activationId}") |
| } |
| queue = MemoryQueue.dropOld( |
| clock, |
| queue, |
| Duration.ofMillis(actionRetentionTimeout), |
| s"Activation processing is not initiated for $actionRetentionTimeout ms", |
| completeErrorActivation) |
| |
| stay |
| |
| // common case for all statuses |
| case Event(StatusQuery, _) => |
| sender ! StatusData(invocationNamespace, action.asString, queue.size, stateName.toString, stateData.toString) |
| stay |
| |
| // Common case for all cases |
| case Event(GracefulShutdown, data) => |
| logging.info(this, s"[$invocationNamespace:$action:$stateName] Gracefully shutdown the memory queue.") |
| // delete relative data, e.g leaderKey, namespaceThrottlingKey, actionThrottlingKey |
| cleanUpData() |
| |
| // let queue manager knows this queue is going to stop and let it forward incoming activations to a new queue |
| context.parent ! queueRemovedMsg |
| |
| goto(Removing) using getRemovingData(data, outdated = false) |
| |
| // the version is updated. it's a shared case for all states |
| case Event(StopSchedulingAsOutdated, data) => |
| logging.info(this, s"[$invocationNamespace:$action:$stateName] stop further scheduling.") |
| // let QueueManager know this queue is no longer in charge. |
| context.parent ! staleQueueRemovedMsg |
| |
| handleStaleActivationsWhenActionUpdated(context.parent) |
| |
| goto(Removing) using getRemovingData(data, outdated = true) |
| |
| case Event(t: FailureMessage, _) => |
| logging.error(this, s"[$invocationNamespace:$action:$stateName] got an unexpected failure message: $t") |
| |
| stay |
| |
| case Event(msg: DecisionResults, _) => |
| val DecisionResults(result, num) = msg |
| result match { |
| case AddInitialContainer if num > 0 => |
| initialized = true |
| val msgs = generateContainerCreationMessages(num) |
| containerManager ! ContainerCreation(msgs, memory, invocationNamespace) |
| |
| case AddContainer if num > 0 => |
| val msgs = generateContainerCreationMessages(num) |
| containerManager ! ContainerCreation(msgs, memory, invocationNamespace) |
| |
| case enable: EnableNamespaceThrottling => |
| if (num > 0) { |
| val msgs = generateContainerCreationMessages(num) |
| containerManager ! ContainerCreation(msgs, memory, invocationNamespace) |
| } |
| self ! enable |
| |
| case DisableNamespaceThrottling => |
| if (num > 0) { |
| val msgs = generateContainerCreationMessages(num) |
| containerManager ! ContainerCreation(msgs, memory, invocationNamespace) |
| } |
| self ! DisableNamespaceThrottling |
| |
| case Pausing => |
| logging.warn( |
| this, |
| s"[$invocationNamespace:$action:$stateName] The limit value is less than 0. No activation can be handled so the queue becomes the Flushing state.") |
| self ! FailedCreationJob( |
| CreationId.void, |
| invocationNamespace, |
| action, |
| revision, |
| ZeroNamespaceLimit, |
| namespaceLimitUnderZero) |
| } |
| stay |
| |
| // this should not happen |
| case otherMsg => |
| logging.warn(this, s"[$invocationNamespace:$action:$stateName] received unexpected message: $otherMsg") |
| |
| stay |
| } |
| |
| onTransition { |
| case Uninitialized -> _ => unstashAll() |
| case _ -> Flushing => startTimerWithFixedDelay("StopQueue", StateTimeout, queueConfig.flushGrace) |
| case Flushing -> _ => cancelTimer("StopQueue") |
| } |
| |
| onTermination { |
| case _ => |
| // logscheduler must be canceled when FSM is terminated |
| logScheduler.cancel() |
| |
| // the lifecycle of DecisionMaker conforms to the one of MemoryQueue |
| actorSystem.stop(decisionMaker) |
| } |
| |
| initialize() |
| |
| private def cleanUpDataAndGotoRemoved() = { |
| cleanUpWatcher() |
| cleanUpData() |
| context.parent ! queueRemovedMsg |
| |
| goto(Removed) using NoData() |
| } |
| |
| private def cleanUpActorsAndGotoRemoved(data: FlushingData) = { |
| cleanUpActors(data) |
| cleanUpData() |
| |
| context.parent ! queueRemovedMsg |
| |
| goto(Removed) using NoData() |
| } |
| |
| private def cleanUpActorsAndGotoRemovedIfPossible(data: RemovingData) = { |
| requestBuffer = requestBuffer.filter(!_.promise.isCompleted) |
| if (queue.isEmpty && requestBuffer.isEmpty) { |
| logging.info(this, s"[$invocationNamespace:$action:$stateName] No activation exist. Shutdown the queue.") |
| // it can be safely called multiple times as it's idempotent |
| cleanUpActors(data) |
| |
| // if the queue is outdated, remove old containers. |
| if (data.outdated) { |
| // let the container manager know this version of containers are outdated. |
| containerManager ! ContainerDeletion(invocationNamespace, action, revision, actionMetaData) |
| } |
| self ! QueueRemovedCompleted |
| |
| goto(Removed) using NoData() |
| } else { |
| logging.info( |
| this, |
| s"[$invocationNamespace:$action:$stateName] Queue is going to stop but there are still ${queue.size} activations and ${requestBuffer.size} request buffered.") |
| stay // waiting for next timeout |
| } |
| } |
| |
| private def getRemovingData(data: MemoryQueueData, outdated: Boolean): MemoryQueueData = { |
| data match { |
| case RunningData(schedulerActor, droppingActor) => |
| RemovingData(schedulerActor, droppingActor, outdated) |
| case ThrottledData(schedulerActor, droppingActor) => |
| RemovingData(schedulerActor, droppingActor, outdated) |
| case FlushingData(schedulerActor, droppingActor, _, _, _) => |
| RemovingData(schedulerActor, droppingActor, outdated) |
| case data: RemovingData => |
| data.copy(outdated = outdated) |
| case _ => |
| NoData() |
| } |
| } |
| |
| private def cleanUpWatcher(): Unit = { |
| watchedKeys.foreach { key => |
| watcherService ! UnwatchEndpoint(key, isPrefix = true, watcherName) |
| } |
| watcherService ! UnwatchEndpoint(leaderKey, isPrefix = false, watcherName) |
| namespaceContainerCount.close() |
| } |
| |
| private def cleanUpActors(data: MemoryQueueData): Unit = { |
| cleanUpWatcher() |
| |
| data match { |
| case RunningData(schedulerActor, droppingActor) => |
| actorSystem.stop(schedulerActor) |
| actorSystem.stop(droppingActor) |
| |
| case ThrottledData(schedulerActor, droppingActor) => |
| actorSystem.stop(schedulerActor) |
| actorSystem.stop(droppingActor) |
| |
| case FlushingData(schedulerActor, droppingActor, _, _, _) => |
| actorSystem.stop(schedulerActor) |
| actorSystem.stop(droppingActor) |
| |
| case RemovingData(schedulerActor, droppingActor, _) => |
| actorSystem.stop(schedulerActor) |
| actorSystem.stop(droppingActor) |
| |
| case _ => // do nothing |
| } |
| } |
| |
| private def cleanUpData(): Unit = { |
| dataManagementService ! UnregisterData(leaderKey) |
| dataManagementService ! UnregisterData(namespaceThrottlingKey) |
| dataManagementService ! UnregisterData(actionThrottlingKey) |
| } |
| |
| private def initializeThrottling() = { |
| dataManagementService ! RegisterInitialData(namespaceThrottlingKey, false.toString, failoverEnabled = false) |
| dataManagementService ! RegisterData(actionThrottlingKey, false.toString, failoverEnabled = false) |
| } |
| |
| private def tryEnableActionThrottling() = { |
| if (queue.size >= queueConfig.maxRetentionSize && stateName != ActionThrottled) { |
| logging.info(this, s"[$invocationNamespace:$action:$stateName] Enable action throttling.") |
| dataManagementService ! RegisterData(actionThrottlingKey, true.toString, failoverEnabled = false) |
| |
| stateData match { |
| case RunningData(schedulerActor, droppingActor) => |
| goto(ActionThrottled) using ThrottledData(schedulerActor, droppingActor) |
| case _ => |
| stay |
| } |
| } else { |
| stay |
| } |
| } |
| |
| private def tryDisableActionThrottling()(implicit tid: TransactionId) = { |
| (stateName, stateData) match { |
| case (ActionThrottled, ThrottledData(schedulerActor, droppingActor)) |
| if queue.size <= queueConfig.maxRetentionSize * queueConfig.throttlingFraction => |
| logging.info(this, s"[$invocationNamespace:$action:$stateName] Disable action throttling.") |
| dataManagementService ! RegisterData(actionThrottlingKey, false.toString, failoverEnabled = false) |
| |
| // at this point, namespace throttling might be enabled, |
| // then the state will be changed to NamespaceThrottled automatically at the next tick |
| goto(Running) using RunningData(schedulerActor, droppingActor) |
| case _ => stay |
| } |
| } |
| |
| private def disableNamespaceThrottling() = { |
| dataManagementService ! RegisterData(namespaceThrottlingKey, false.toString, failoverEnabled = false) |
| } |
| |
| private def enableNamespaceThrottling() = { |
| dataManagementService ! RegisterData(namespaceThrottlingKey, true.toString, failoverEnabled = false) |
| } |
| |
| private def completeErrorActivation(activation: ActivationMessage, |
| message: String, |
| isWhiskError: Boolean): Future[Any] = { |
| logging.error( |
| this, |
| s"[$invocationNamespace:$action:$stateName] complete activation ${activation.activationId} with error $message")( |
| activation.transid) |
| |
| val totalTimeInScheduler = Interval(activation.transid.meta.start, Instant.now()).duration |
| MetricEmitter.emitHistogramMetric( |
| LoggingMarkers.SCHEDULER_WAIT_TIME(action.asString), |
| totalTimeInScheduler.toMillis) |
| |
| val activationResponse = |
| if (isWhiskError) |
| generateFallbackActivation(activation, ActivationResponse.whiskError(message)) |
| else |
| generateFallbackActivation(activation, ActivationResponse.developerError(message)) |
| |
| // TODO change scheduler instance id |
| val instance = InvokerInstanceId(0, userMemory = 0.MB) |
| |
| val ackMsg = if (activation.blocking) { |
| CombinedCompletionAndResultMessage(activation.transid, activationResponse, instance) |
| } else { |
| CompletionMessage(activation.transid, activationResponse, instance) |
| } |
| |
| if (!isWhiskError && message == tooManyConcurrentRequests) { |
| val metric = Metric("ConcurrentRateLimit", 1) |
| UserEvents.send( |
| messagingProducer, |
| EventMessage( |
| schedulerId.toString, |
| metric, |
| activation.user.subject, |
| invocationNamespace, |
| activation.user.namespace.uuid, |
| metric.typeName)) |
| } |
| |
| ack( |
| activation.transid, |
| activationResponse, |
| activation.blocking, |
| activation.rootControllerIndex, |
| activation.user.namespace.uuid, |
| ackMsg) |
| .andThen { |
| case Failure(t) => |
| logging.error(this, s"[$invocationNamespace:$action:$stateName] failed to send ack due to $t") |
| } |
| store(activation.transid, activationResponse, UserContext(activation.user)) |
| .andThen { |
| case Failure(t) => |
| logging.error(this, s"[$invocationNamespace:$action:$stateName] failed to store activation due to $t") |
| } |
| } |
| |
| private def forwardAllActivations(queueManager: ActorRef): Unit = { |
| while (queue.nonEmpty) { |
| val (TimeSeriesActivationEntry(_, msg), newQueue) = queue.dequeue |
| queue = newQueue |
| logging.info(this, s"Forward msg ${msg.activationId} to the queue manager")(msg.transid) |
| queueManager ! msg |
| } |
| } |
| |
| private def handleStaleActivationsWhenActionUpdated(queueManager: ActorRef): Unit = { |
| if (queue.size > 0) { |
| // if doesn't exist old container to pull old memoryQueue's activation, send the old activations to queueManager |
| if (containers.size == 0) { |
| logging.warn( |
| this, |
| s"[$invocationNamespace:$action:$stateName] does not exist old version container to fetch the old version activation") |
| forwardAllActivations(queueManager) |
| } else { |
| logging.info( |
| this, |
| s"[$invocationNamespace:$action:$stateName] old version activation would be fetched by old version container") |
| } |
| } |
| } |
| |
| private def completeAllActivations(reason: String, isWhiskError: Boolean): Unit = { |
| while (queue.nonEmpty) { |
| val (TimeSeriesActivationEntry(_, msg), newQueue) = queue.dequeue |
| queue = newQueue |
| completeErrorActivation(msg, reason, isWhiskError) |
| } |
| } |
| |
| // since there is no initial delay, it will try to create a container at initialization time |
| // these schedulers will run forever and stop when the memory queue stops |
| private def startMonitoring(): (ActorRef, ActorRef) = { |
| val droppingScheduler = Scheduler.scheduleWaitAtLeast(schedulingConfig.dropInterval) { () => |
| checkToDropStaleActivation( |
| clock, |
| queue, |
| actionRetentionTimeout, |
| invocationNamespace, |
| actionMetaData, |
| stateName, |
| self) |
| Future.successful(()) |
| } |
| |
| val monitoringScheduler = Scheduler.scheduleWaitAtLeast(schedulingConfig.checkInterval) { () => |
| // the average duration is updated every checkInterval |
| if (averageDurationBuffer.nonEmpty) { |
| averageDuration = Some(averageDurationBuffer.average) |
| } |
| getUserLimit(invocationNamespace).andThen { |
| case Success(limit) => |
| decisionMaker ! QueueSnapshot( |
| initialized, |
| in, |
| queue.size, |
| containers.size, |
| creationIds.size, |
| getStaleActivationNum(0, queue), |
| namespaceContainerCount.existingContainerNumByNamespace, |
| namespaceContainerCount.inProgressContainerNumByNamespace, |
| averageDuration, |
| limit, |
| stateName, |
| self) |
| case Failure(_: NoDocumentException) => |
| // no limit available for the namespace |
| self ! StopSchedulingAsOutdated |
| } |
| } |
| (monitoringScheduler, droppingScheduler) |
| } |
| |
| private def getAverageDuration() = { |
| // check the duration only once |
| actorSystem.scheduler.scheduleOnce(duration.Duration.Zero) { |
| durationChecker.checkAverageDuration(invocationNamespace, actionMetaData) { durationCheckResult => |
| if (durationCheckResult.hitCount > 0) { |
| averageDuration = durationCheckResult.averageDuration |
| } |
| durationCheckResult |
| } |
| } |
| } |
| |
| @tailrec |
| private def getStaleActivationNum(count: Int, queue: Queue[TimeSeriesActivationEntry]): Int = { |
| if (queue.isEmpty || Duration |
| .between(queue.head.timestamp, clock.now()) |
| .compareTo(StaleDuration) < 0) count |
| else |
| getStaleActivationNum(count + 1, queue.tail) |
| } |
| |
| private def generateContainerCreationMessages(num: Int) = { |
| (1 to num).map { _ => |
| val msg = ContainerCreationMessage( |
| TransactionId.containerCreation, |
| invocationNamespace, |
| action, |
| revision, |
| actionMetaData, |
| schedulerId, |
| endpoints.host, |
| endpoints.rpcPort) |
| creationIds += msg.creationId.asString |
| logging.info( |
| this, |
| s"[$invocationNamespace:$action:$stateName] Try to create a new container with creationId ${msg.creationId.asString}") |
| msg |
| }.toList |
| } |
| |
| /* take the first uncompleted request from requestBuffer. */ |
| private def takeUncompletedRequest(): Option[Promise[Either[MemoryQueueError, ActivationMessage]]] = { |
| requestBuffer = requestBuffer.filter(!_.promise.isCompleted) |
| if (requestBuffer.nonEmpty) { |
| Some(requestBuffer.dequeue.promise) |
| } else None |
| } |
| |
| private def removeDeletedContainerFromRequestBuffer(containerId: String): Unit = { |
| requestBuffer = requestBuffer.filter { buffer => |
| if (buffer.containerId.drop(1) == containerId) { |
| buffer.promise.trySuccess(Left(NoActivationMessage())) |
| false |
| } else |
| true |
| } |
| } |
| |
| private def handleActivationMessage(msg: ActivationMessage) = { |
| logging.info(this, s"[$invocationNamespace:$action:$stateName] got a new activation message ${msg.activationId}")( |
| msg.transid) |
| in.incrementAndGet() |
| takeUncompletedRequest() |
| .map { res => |
| val totalTimeInScheduler = Interval(msg.transid.meta.start, Instant.now()).duration |
| MetricEmitter.emitHistogramMetric( |
| LoggingMarkers.SCHEDULER_WAIT_TIME(action.asString), |
| totalTimeInScheduler.toMillis) |
| res.trySuccess(Right(msg)) |
| in.decrementAndGet() |
| stay |
| } |
| .getOrElse { |
| queue = queue.enqueue(TimeSeriesActivationEntry(clock.now(), msg)) |
| in.decrementAndGet() |
| tryEnableActionThrottling() |
| } |
| } |
| |
| private def handleActivationRequest(request: GetActivation)(implicit tid: TransactionId) = { |
| request.lastDuration.foreach(averageDurationBuffer.add(_)) |
| |
| if (queue.nonEmpty) { |
| val (TimeSeriesActivationEntry(_, msg), newQueue) = queue.dequeue |
| queue = newQueue |
| logging.info( |
| this, |
| s"[$invocationNamespace:$action:$stateName] Get activation request ${request.containerId}, send one message: ${msg.activationId}") |
| val totalTimeInScheduler = Interval(msg.transid.meta.start, Instant.now()).duration |
| MetricEmitter.emitHistogramMetric( |
| LoggingMarkers.SCHEDULER_WAIT_TIME(action.asString), |
| totalTimeInScheduler.toMillis) |
| |
| sender ! GetActivationResponse(Right(msg)) |
| tryDisableActionThrottling() |
| } else { |
| pollForActivation(sender, request) |
| stay |
| } |
| } |
| |
| /** |
| * Save promise in a Queue, once new activationMessage come, complete the promise with it, if timeout(1s), complete the |
| * promise with NoActivationMessage |
| */ |
| private def pollForActivation(sender: ActorRef, request: GetActivation)(implicit tid: TransactionId): Unit = { |
| val promise = Promise[Either[MemoryQueueError, ActivationMessage]]() |
| val cancelPoll = actorSystem.scheduler.scheduleOnce(pollTimeOut) { |
| self ! CancelPoll(promise) |
| } |
| |
| // "1xxx" is always bigger than "0xxx", so warmed containers will be took first while dequeue from `requestBuffer` |
| val warmedFlag = if (request.warmed) 1 else 0 |
| requestBuffer.enqueue(BufferedRequest(warmedFlag + request.containerId, promise)) |
| promise.future.onComplete { |
| case Success(value) => |
| sender ! GetActivationResponse(value) |
| value match { |
| case Right(msg) => |
| logging.info( |
| this, |
| s"[$invocationNamespace:$action:$stateName] Send msg ${msg.activationId} to waiting request ${request.containerId}") |
| cancelPoll.cancel() |
| case Left(_) => // do nothing |
| } |
| case Failure(t) => // this shouldn't happen |
| logging.error( |
| this, |
| s"[$invocationNamespace:$action:$stateName] Unexpected error ${t.getMessage} while poll for activation.") |
| sender ! GetActivationResponse(Left(NoActivationMessage())) |
| cancelPoll.cancel() |
| } |
| } |
| |
| /** Generates an activation with zero runtime. Usually used for error cases */ |
| private def generateFallbackActivation(msg: ActivationMessage, response: ActivationResponse): WhiskActivation = { |
| val now = clock.now() |
| val causedBy = if (msg.causedBySequence) { |
| Some(Parameters(WhiskActivation.causedByAnnotation, JsString(Exec.SEQUENCE))) |
| } else None |
| |
| val limits = Parameters(WhiskActivation.limitsAnnotation, actionMetaData.limits.toJson) |
| val binding = |
| actionMetaData.binding.map(f => Parameters(WhiskActivation.bindingAnnotation, JsString(f.asString))) |
| |
| WhiskActivation( |
| activationId = msg.activationId, |
| namespace = msg.user.namespace.name.toPath, |
| subject = msg.user.subject, |
| cause = msg.cause, |
| name = msg.action.name, |
| version = msg.action.version.getOrElse(SemVer()), |
| start = now, |
| end = now, |
| duration = Some(0), |
| response = response, |
| annotations = { |
| Parameters(WhiskActivation.pathAnnotation, JsString(msg.action.copy(version = None).asString)) ++ |
| Parameters(WhiskActivation.kindAnnotation, JsString(actionMetaData.exec.kind)) ++ |
| causedBy ++ limits ++ binding |
| }) |
| } |
| |
| private def isWhiskError(error: ContainerCreationError): Boolean = ContainerCreationError.whiskErrors.contains(error) |
| } |
| |
| object MemoryQueue { |
| private[queue] val queueConfig = loadConfigOrThrow[QueueConfig](ConfigKeys.schedulerQueue) |
| |
| def props(etcdClient: EtcdClient, |
| durationChecker: DurationChecker, |
| fqn: FullyQualifiedEntityName, |
| messagingProducer: MessageProducer, |
| schedulingConfig: SchedulingConfig, |
| invocationNamespace: String, |
| revision: DocRevision, |
| endpoints: SchedulerEndpoints, |
| actionMetaData: WhiskActionMetaData, |
| dataManagementService: ActorRef, |
| watcherService: ActorRef, |
| containerManager: ActorRef, |
| decisionMaker: ActorRef, |
| schedulerId: SchedulerInstanceId, |
| ack: ActiveAck, |
| store: (TransactionId, WhiskActivation, UserContext) => Future[Any], |
| getUserLimit: String => Future[Int])(implicit logging: Logging): Props = { |
| implicit val clock: Clock = SystemClock |
| Props( |
| new MemoryQueue( |
| etcdClient, |
| durationChecker, |
| fqn: FullyQualifiedEntityName, |
| messagingProducer: MessageProducer, |
| schedulingConfig: SchedulingConfig, |
| invocationNamespace: String, |
| revision, |
| endpoints: SchedulerEndpoints, |
| actionMetaData, |
| dataManagementService, |
| watcherService, |
| containerManager, |
| decisionMaker, |
| schedulerId, |
| ack, |
| store, |
| getUserLimit, |
| checkToDropStaleActivation, |
| queueConfig)) |
| } |
| |
| @tailrec |
| def dropOld( |
| clock: Clock, |
| queue: Queue[TimeSeriesActivationEntry], |
| retention: Duration, |
| reason: String, |
| completeErrorActivation: (ActivationMessage, String, Boolean) => Future[Any]): Queue[TimeSeriesActivationEntry] = { |
| if (queue.isEmpty || Duration.between(queue.head.timestamp, clock.now()).compareTo(retention) < 0) |
| queue |
| else { |
| completeErrorActivation(queue.head.msg, reason, true) |
| dropOld(clock, queue.tail, retention, reason, completeErrorActivation) |
| } |
| } |
| |
| def checkToDropStaleActivation(clock: Clock, |
| queue: Queue[TimeSeriesActivationEntry], |
| maxRetentionMs: Long, |
| invocationNamespace: String, |
| actionMetaData: WhiskActionMetaData, |
| stateName: MemoryQueueState, |
| queueRef: ActorRef)(implicit logging: Logging) = { |
| val action = actionMetaData.fullyQualifiedName(true) |
| logging.debug( |
| this, |
| s"[$invocationNamespace:$action:$stateName] use the given retention timeout: $maxRetentionMs for this action kind: ${actionMetaData.exec.kind}.") |
| |
| if (queue.nonEmpty && Duration |
| .between(queue.head.timestamp, clock.now()) |
| .compareTo(Duration.ofMillis(maxRetentionMs)) >= 0) { |
| logging.info( |
| this, |
| s"[$invocationNamespace:$action:$stateName] some activations are stale msg: ${queue.head.msg.activationId}.") |
| |
| queueRef ! DropOld |
| } |
| } |
| |
| private def getRetentionTimeout(actionMetaData: WhiskActionMetaData, queueConfig: QueueConfig): Long = { |
| if (actionMetaData.exec.kind == ExecMetaDataBase.BLACKBOX) { |
| queueConfig.maxBlackboxRetentionMs |
| } else { |
| queueConfig.maxRetentionMs |
| } |
| } |
| } |
| |
| case class QueueSnapshot(initialized: Boolean, |
| incomingMsgCount: AtomicInteger, |
| currentMsgCount: Int, |
| existingContainerCount: Int, |
| inProgressContainerCount: Int, |
| staleActivationNum: Int, |
| existingContainerCountInNamespace: Int, |
| inProgressContainerCountInNamespace: Int, |
| averageDuration: Option[Double], |
| limit: Int, |
| stateName: MemoryQueueState, |
| recipient: ActorRef) |
| |
| case class QueueConfig(idleGrace: FiniteDuration, |
| stopGrace: FiniteDuration, |
| flushGrace: FiniteDuration, |
| gracefulShutdownTimeout: FiniteDuration, |
| maxRetentionSize: Int, |
| maxRetentionMs: Long, |
| maxBlackboxRetentionMs: Long, |
| throttlingFraction: Double, |
| durationBufferSize: Int) |
| |
| case class BufferedRequest(containerId: String, promise: Promise[Either[MemoryQueueError, ActivationMessage]]) |
| case object DropOld |
| |
| case class ContainerKeyMeta(revision: DocRevision, invokerId: Int, containerId: String) |