| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.openwhisk.core.scheduler.container |
| import java.nio.charset.StandardCharsets |
| import java.util.concurrent.ThreadLocalRandom |
| |
| import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props} |
| import akka.event.Logging.InfoLevel |
| import org.apache.kafka.clients.producer.RecordMetadata |
| import org.apache.openwhisk.common.InvokerState.{Healthy, Offline, Unhealthy} |
| import org.apache.openwhisk.common.{GracefulShutdown, InvokerHealth, Logging, LoggingMarkers, TransactionId} |
| import org.apache.openwhisk.core.connector.ContainerCreationError.{ |
| NoAvailableInvokersError, |
| NoAvailableResourceInvokersError |
| } |
| import org.apache.openwhisk.core.connector._ |
| import org.apache.openwhisk.core.entity.size._ |
| import org.apache.openwhisk.core.entity.{ |
| Annotations, |
| ByteSize, |
| DocInfo, |
| DocRevision, |
| FullyQualifiedEntityName, |
| InvokerInstanceId, |
| MemoryLimit, |
| SchedulerInstanceId |
| } |
| import org.apache.openwhisk.core.etcd.EtcdClient |
| import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.containerPrefix |
| import org.apache.openwhisk.core.etcd.EtcdKV.{ContainerKeys, InvokerKeys} |
| import org.apache.openwhisk.core.etcd.EtcdType._ |
| import org.apache.openwhisk.core.scheduler.Scheduler |
| import org.apache.openwhisk.core.scheduler.message.{ |
| ContainerCreation, |
| ContainerDeletion, |
| ContainerKeyMeta, |
| CreationJobState, |
| FailedCreationJob, |
| RegisterCreationJob, |
| ReschedulingCreationJob, |
| SuccessfulCreationJob |
| } |
| import org.apache.openwhisk.core.service.{ |
| DeleteEvent, |
| PutEvent, |
| UnwatchEndpoint, |
| WatchEndpoint, |
| WatchEndpointInserted, |
| WatchEndpointRemoved |
| } |
| import org.apache.openwhisk.core.{ConfigKeys, WarmUp, WhiskConfig} |
| import pureconfig.generic.auto._ |
| import pureconfig.loadConfigOrThrow |
| import spray.json.DefaultJsonProtocol._ |
| |
| import scala.collection.JavaConverters._ |
| import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future} |
| import scala.collection.concurrent.TrieMap |
| import scala.util.{Failure, Success} |
| |
| case class ScheduledPair(msg: ContainerCreationMessage, invokerId: InvokerInstanceId) |
| |
| case class BlackboxFractionConfig(managedFraction: Double, blackboxFraction: Double) |
| |
| class ContainerManager(jobManagerFactory: ActorRefFactory => ActorRef, |
| provider: MessagingProvider, |
| schedulerInstanceId: SchedulerInstanceId, |
| etcdClient: EtcdClient, |
| config: WhiskConfig, |
| watcherService: ActorRef)(implicit actorSystem: ActorSystem, logging: Logging) |
| extends Actor { |
| private implicit val ec: ExecutionContextExecutor = context.dispatcher |
| |
| private val creationJobManager = jobManagerFactory(context) |
| |
| private val messagingProducer = provider.getProducer(config) |
| |
| private var warmedContainers = Set.empty[String] |
| |
| private val warmedInvokers = TrieMap[Int, String]() |
| |
| private val inProgressWarmedContainers = TrieMap.empty[String, String] |
| |
| private val warmKey = ContainerKeys.warmedPrefix |
| private val invokerKey = InvokerKeys.prefix |
| private val watcherName = s"container-manager" |
| |
| watcherService ! WatchEndpoint(warmKey, "", isPrefix = true, watcherName, Set(PutEvent, DeleteEvent)) |
| watcherService ! WatchEndpoint(invokerKey, "", isPrefix = true, watcherName, Set(PutEvent, DeleteEvent)) |
| |
| override def receive: Receive = { |
| case ContainerCreation(msgs, memory, invocationNamespace) => |
| createContainer(msgs, memory, invocationNamespace) |
| |
| case ContainerDeletion(invocationNamespace, fqn, revision, whiskActionMetaData) => |
| getInvokersWithOldContainer(invocationNamespace, fqn, revision) |
| .map { invokers => |
| val msg = ContainerDeletionMessage( |
| TransactionId.containerDeletion, |
| invocationNamespace, |
| fqn, |
| revision, |
| whiskActionMetaData) |
| invokers.foreach(sendDeletionContainerToInvoker(messagingProducer, _, msg)) |
| } |
| |
| case rescheduling: ReschedulingCreationJob => |
| val msg = rescheduling.toCreationMessage(schedulerInstanceId, rescheduling.retry + 1) |
| createContainer( |
| List(msg), |
| rescheduling.actionMetaData.limits.memory.megabytes.MB, |
| rescheduling.invocationNamespace) |
| |
| case WatchEndpointInserted(watchKey, key, _, true) => |
| watchKey match { |
| case `warmKey` => warmedContainers += key |
| case `invokerKey` => |
| val invoker = InvokerKeys.getInstanceId(key) |
| warmedInvokers.getOrElseUpdate(invoker.instance, { |
| warmUpInvoker(invoker) |
| invoker.toString |
| }) |
| } |
| |
| case WatchEndpointRemoved(watchKey, key, _, true) => |
| watchKey match { |
| case `warmKey` => warmedContainers -= key |
| case `invokerKey` => |
| val invoker = InvokerKeys.getInstanceId(key) |
| warmedInvokers.remove(invoker.instance) |
| } |
| |
| case FailedCreationJob(cid, _, _, _, _, _) => |
| inProgressWarmedContainers.remove(cid.asString) |
| |
| case SuccessfulCreationJob(cid, _, _, _) => |
| inProgressWarmedContainers.remove(cid.asString) |
| |
| case GracefulShutdown => |
| watcherService ! UnwatchEndpoint(warmKey, isPrefix = true, watcherName) |
| watcherService ! UnwatchEndpoint(invokerKey, isPrefix = true, watcherName) |
| creationJobManager ! GracefulShutdown |
| |
| case _ => |
| } |
| |
| private def createContainer(msgs: List[ContainerCreationMessage], |
| memory: ByteSize, |
| invocationNamespace: String): Unit = { |
| logging.info(this, s"received ${msgs.size} creation message [${msgs.head.invocationNamespace}:${msgs.head.action}]") |
| val coldCreations = filterWarmedCreations(msgs) |
| if (coldCreations.nonEmpty) |
| ContainerManager |
| .getAvailableInvokers(etcdClient, memory, invocationNamespace) |
| .flatMap { invokers => |
| if (invokers.isEmpty) { |
| coldCreations.foreach { msg => |
| ContainerManager.sendState( |
| FailedCreationJob( |
| msg.creationId, |
| msg.invocationNamespace, |
| msg.action, |
| msg.revision, |
| NoAvailableInvokersError, |
| s"No available invokers.")) |
| } |
| Future.failed(NoCapacityException("No available invokers.")) |
| } else { |
| coldCreations.foreach { msg => |
| creationJobManager ! RegisterCreationJob(msg) |
| } |
| |
| Future { |
| ContainerManager |
| .schedule(invokers, coldCreations, memory) |
| .map { pair => |
| sendCreationContainerToInvoker(messagingProducer, pair.invokerId.toInt, pair.msg) |
| } |
| } |
| }.andThen { |
| case Failure(t) => logging.warn(this, s"Failed to create container caused by: $t") |
| } |
| } |
| } |
| |
| private def getInvokersWithOldContainer(invocationNamespace: String, |
| fqn: FullyQualifiedEntityName, |
| currentRevision: DocRevision): Future[List[Int]] = { |
| val namespacePrefix = containerPrefix(ContainerKeys.namespacePrefix, invocationNamespace, fqn) |
| val warmedPrefix = containerPrefix(ContainerKeys.warmedPrefix, invocationNamespace, fqn) |
| |
| for { |
| existing <- etcdClient |
| .getPrefix(namespacePrefix) |
| .map { res => |
| res.getKvsList.asScala.map { kv => |
| parseExistingContainerKey(namespacePrefix, kv.getKey) |
| } |
| } |
| warmed <- etcdClient |
| .getPrefix(warmedPrefix) |
| .map { res => |
| res.getKvsList.asScala.map { kv => |
| parseWarmedContainerKey(warmedPrefix, kv.getKey) |
| } |
| } |
| } yield { |
| (existing ++ warmed) |
| .dropWhile(k => k.revision > currentRevision) // remain latest revision |
| .groupBy(k => k.invokerId) // remove duplicated value |
| .map(_._2.head.invokerId) |
| .toList |
| } |
| } |
| |
| /** |
| * existingKey format: {tag}/namespace/{invocationNamespace}/{namespace}/({pkg}/)/{name}/{revision}/invoker{id}/container/{containerId} |
| */ |
| private def parseExistingContainerKey(prefix: String, existingKey: String): ContainerKeyMeta = { |
| val keys = existingKey.replace(prefix, "").split("/") |
| val revision = DocRevision(keys(0)) |
| val invokerId = keys(1).replace("invoker", "").toInt |
| val containerId = keys(3) |
| ContainerKeyMeta(revision, invokerId, containerId) |
| } |
| |
| /** |
| * warmedKey format: {tag}/warmed/{invocationNamespace}/{namespace}/({pkg}/)/{name}/{revision}/invoker/{id}/container/{containerId} |
| */ |
| private def parseWarmedContainerKey(prefix: String, warmedKey: String): ContainerKeyMeta = { |
| val keys = warmedKey.replace(prefix, "").split("/") |
| val revision = DocRevision(keys(0)) |
| val invokerId = keys(2).toInt |
| val containerId = keys(4) |
| ContainerKeyMeta(revision, invokerId, containerId) |
| } |
| |
| // Filter out messages which can use warmed container |
| private def filterWarmedCreations(msgs: List[ContainerCreationMessage]) = { |
| msgs.filter { msg => |
| val warmedPrefix = containerPrefix(ContainerKeys.warmedPrefix, msg.invocationNamespace, msg.action) |
| val chosenInvoker = warmedContainers |
| .filter(!inProgressWarmedContainers.values.toSeq.contains(_)) |
| .find { container => |
| if (container.startsWith(warmedPrefix)) { |
| logging.info(this, s"Choose a warmed container $container") |
| inProgressWarmedContainers.update(msg.creationId.asString, container) |
| true |
| } else |
| false |
| } |
| .map(_.split("/").takeRight(3).apply(0)) |
| if (chosenInvoker.nonEmpty) { |
| creationJobManager ! RegisterCreationJob(msg) |
| sendCreationContainerToInvoker(messagingProducer, chosenInvoker.get.toInt, msg) |
| false |
| } else |
| true |
| } |
| } |
| |
| private def sendCreationContainerToInvoker(producer: MessageProducer, |
| invoker: Int, |
| msg: ContainerCreationMessage): Future[RecordMetadata] = { |
| implicit val transid: TransactionId = msg.transid |
| |
| val topic = s"${Scheduler.topicPrefix}invoker$invoker" |
| val start = transid.started(this, LoggingMarkers.SCHEDULER_KAFKA, s"posting to $topic") |
| |
| producer.send(topic, msg).andThen { |
| case Success(status) => |
| transid.finished( |
| this, |
| start, |
| s"posted creationId: ${msg.creationId} for ${msg.invocationNamespace}/${msg.action} to ${status |
| .topic()}[${status.partition()}][${status.offset()}]", |
| logLevel = InfoLevel) |
| case Failure(_) => |
| logging.error(this, s"Failed to create container for ${msg.action}, error: error on posting to topic $topic") |
| transid.failed(this, start, s"error on posting to topic $topic") |
| } |
| } |
| |
| private def sendDeletionContainerToInvoker(producer: MessageProducer, |
| invoker: Int, |
| msg: ContainerDeletionMessage): Future[RecordMetadata] = { |
| implicit val transid: TransactionId = msg.transid |
| |
| val topic = s"${Scheduler.topicPrefix}invoker$invoker" |
| val start = transid.started(this, LoggingMarkers.SCHEDULER_KAFKA, s"posting to $topic") |
| |
| producer.send(topic, msg).andThen { |
| case Success(status) => |
| transid.finished( |
| this, |
| start, |
| s"posted deletion for ${msg.invocationNamespace}/${msg.action} to ${status |
| .topic()}[${status.partition()}][${status.offset()}]", |
| logLevel = InfoLevel) |
| case Failure(_) => |
| logging.error(this, s"Failed to delete container for ${msg.action}, error: error on posting to topic $topic") |
| transid.failed(this, start, s"error on posting to topic $topic") |
| } |
| } |
| |
| private def warmUpInvoker(invoker: InvokerInstanceId): Unit = { |
| logging.info(this, s"Warm up invoker $invoker") |
| WarmUp.warmUpContainerCreationMessage(schedulerInstanceId).foreach { |
| sendCreationContainerToInvoker(messagingProducer, invoker.instance, _) |
| } |
| } |
| |
| // warm up all invokers |
| private def warmUp() = { |
| // warm up exist invokers |
| ContainerManager.getAvailableInvokers(etcdClient, MemoryLimit.MIN_MEMORY).map { invokers => |
| invokers.foreach { invoker => |
| warmedInvokers.getOrElseUpdate(invoker.id.instance, { |
| warmUpInvoker(invoker.id) |
| invoker.id.toString |
| }) |
| } |
| } |
| |
| } |
| |
| warmUp() |
| } |
| |
| object ContainerManager { |
| val fractionConfig: BlackboxFractionConfig = |
| loadConfigOrThrow[BlackboxFractionConfig](ConfigKeys.fraction) |
| |
| private val managedFraction: Double = Math.max(0.0, Math.min(1.0, fractionConfig.managedFraction)) |
| private val blackboxFraction: Double = Math.max(1.0 - managedFraction, Math.min(1.0, fractionConfig.blackboxFraction)) |
| |
| def props(jobManagerFactory: ActorRefFactory => ActorRef, |
| provider: MessagingProvider, |
| schedulerInstanceId: SchedulerInstanceId, |
| etcdClient: EtcdClient, |
| config: WhiskConfig, |
| watcherService: ActorRef)(implicit actorSystem: ActorSystem, logging: Logging): Props = |
| Props(new ContainerManager(jobManagerFactory, provider, schedulerInstanceId, etcdClient, config, watcherService)) |
| |
| /** |
| * The rng algorithm is responsible for the invoker distribution, and the better the distribution, the smaller the number of rescheduling. |
| * |
| */ |
| def rng(mod: Int): Int = ThreadLocalRandom.current().nextInt(mod) |
| |
| /** |
| * Assign an invoker to a message |
| * |
| * Assumption |
| * - The memory of each invoker is larger than minMemory. |
| * - Messages that are not assigned an invoker are discarded. |
| * |
| * @param invokers Available invoker pool |
| * @param msgs Messages to which the invoker will be assigned |
| * @param minMemory Minimum memory for all invokers |
| * @return A pair of messages and assigned invokers |
| */ |
| def schedule(invokers: List[InvokerHealth], msgs: List[ContainerCreationMessage], minMemory: ByteSize)( |
| implicit logging: Logging): List[ScheduledPair] = { |
| logging.info(this, s"usable total invoker size: ${invokers.size}") |
| val noTaggedInvokers = invokers.filter(_.id.tags.isEmpty) |
| val managed = Math.max(1, Math.ceil(noTaggedInvokers.size.toDouble * managedFraction).toInt) |
| val blackboxes = Math.max(1, Math.floor(noTaggedInvokers.size.toDouble * blackboxFraction).toInt) |
| val managedInvokers = noTaggedInvokers.take(managed) |
| val blackboxInvokers = noTaggedInvokers.takeRight(blackboxes) |
| logging.info( |
| this, |
| s"${msgs.size} creation messages for ${msgs.head.invocationNamespace}/${msgs.head.action}, managedFraction:$managedFraction, blackboxFraction:$blackboxFraction, managed invoker size:$managed, blackboxes invoker size:$blackboxes") |
| val list = msgs |
| .foldLeft((List.empty[ScheduledPair], invokers)) { (tuple, msg: ContainerCreationMessage) => |
| val pairs = tuple._1 |
| val candidates = tuple._2 |
| |
| val requiredResources = |
| msg.whiskActionMetaData.annotations |
| .getAs[Seq[String]](Annotations.InvokerResourcesAnnotationName) |
| .getOrElse(Seq.empty[String]) |
| val resourcesStrictPolicy = msg.whiskActionMetaData.annotations |
| .getAs[Boolean](Annotations.InvokerResourcesStrictPolicyAnnotationName) |
| .getOrElse(true) |
| val isBlackboxInvocation = msg.whiskActionMetaData.toExecutableWhiskAction.map(_.exec.pull).getOrElse(false) |
| if (requiredResources.isEmpty) { |
| // only choose managed invokers or blackbox invokers |
| val wantedInvokers = if (isBlackboxInvocation) { |
| candidates.filter(c => blackboxInvokers.map(b => b.id.instance).contains(c.id.instance)).toSet |
| } else { |
| candidates.filter(c => managedInvokers.map(m => m.id.instance).contains(c.id.instance)).toSet |
| } |
| val taggedInvokers = candidates.filter(_.id.tags.nonEmpty) |
| |
| if (wantedInvokers.nonEmpty) { |
| chooseInvokerFromCandidates(wantedInvokers.toList, invokers, pairs, msg) |
| } else if (taggedInvokers.nonEmpty) { // if not found from the wanted invokers, choose tagged invokers then |
| chooseInvokerFromCandidates(taggedInvokers, invokers, pairs, msg) |
| } else { |
| sendState( |
| FailedCreationJob( |
| msg.creationId, |
| msg.invocationNamespace, |
| msg.action, |
| msg.revision, |
| NoAvailableInvokersError, |
| s"No available invokers.")) |
| (pairs, candidates) |
| } |
| } else { |
| val wantedInvokers = candidates.filter(health => requiredResources.toSet.subsetOf(health.id.tags.toSet)) |
| if (wantedInvokers.nonEmpty) { |
| chooseInvokerFromCandidates(wantedInvokers, invokers, pairs, msg) |
| } else if (resourcesStrictPolicy) { |
| sendState( |
| FailedCreationJob( |
| msg.creationId, |
| msg.invocationNamespace, |
| msg.action, |
| msg.revision, |
| NoAvailableResourceInvokersError, |
| s"No available invokers with resources $requiredResources.")) |
| (pairs, candidates) |
| } else { |
| val (noTaggedInvokers, taggedInvokers) = candidates.partition(_.id.tags.isEmpty) |
| if (noTaggedInvokers.nonEmpty) { // choose no tagged invokers first |
| chooseInvokerFromCandidates(noTaggedInvokers, invokers, pairs, msg) |
| } else { |
| val leftInvokers = |
| taggedInvokers.filterNot(health => requiredResources.toSet.subsetOf(health.id.tags.toSet)) |
| if (leftInvokers.nonEmpty) |
| chooseInvokerFromCandidates(leftInvokers, invokers, pairs, msg) |
| else { |
| sendState( |
| FailedCreationJob( |
| msg.creationId, |
| msg.invocationNamespace, |
| msg.action, |
| msg.revision, |
| NoAvailableInvokersError, |
| s"No available invokers.")) |
| (pairs, candidates) |
| } |
| } |
| } |
| } |
| } |
| ._1 // pairs |
| list |
| } |
| |
| private def chooseInvokerFromCandidates( |
| candidates: List[InvokerHealth], |
| wholeInvokers: List[InvokerHealth], |
| pairs: List[ScheduledPair], |
| msg: ContainerCreationMessage)(implicit logging: Logging): (List[ScheduledPair], List[InvokerHealth]) = { |
| val idx = rng(mod = candidates.size) |
| val instance = candidates(idx) |
| // it must be compared to the instance unique id |
| val idxInWhole = wholeInvokers.indexOf(wholeInvokers.filter(p => p.id.instance == instance.id.instance).head) |
| val requiredMemory = msg.whiskActionMetaData.limits.memory.megabytes |
| val updated = |
| if (instance.id.userMemory.toMB - requiredMemory >= requiredMemory) { // Since ByteSize is negative, it converts to long type and compares. |
| wholeInvokers.updated( |
| idxInWhole, |
| instance.copy(id = instance.id.copy(userMemory = instance.id.userMemory - requiredMemory.MB))) |
| } else { |
| // drop the nth element |
| val split = wholeInvokers.splitAt(idxInWhole) |
| val _ :: t = split._2 |
| split._1 ::: t |
| } |
| |
| (ScheduledPair(msg, instance.id) :: pairs, updated) |
| } |
| |
| private def sendState(state: CreationJobState)(implicit logging: Logging): Unit = { |
| QueuePool.get(MemoryQueueKey(state.invocationNamespace, state.action.toDocId.asDocInfo(state.revision))) match { |
| case Some(memoryQueueValue) if memoryQueueValue.isLeader => |
| memoryQueueValue.queue ! state |
| case _ => |
| logging.error(this, s"get a $state for a nonexistent memory queue or a follower") |
| } |
| } |
| |
| protected[scheduler] def getAvailableInvokers(etcd: EtcdClient, minMemory: ByteSize, invocationNamespace: String)( |
| implicit executor: ExecutionContext): Future[List[InvokerHealth]] = { |
| etcd |
| .getPrefix(InvokerKeys.prefix) |
| .map { res => |
| res.getKvsList.asScala |
| .map { kv => |
| InvokerResourceMessage |
| .parse(kv.getValue.toString(StandardCharsets.UTF_8)) |
| .map { resourceMessage => |
| val status = resourceMessage.status match { |
| case Healthy.asString => Healthy |
| case Unhealthy.asString => Unhealthy |
| case _ => Offline |
| } |
| |
| val temporalId = InvokerKeys.getInstanceId(kv.getKey.toString(StandardCharsets.UTF_8)) |
| val invoker = temporalId.copy( |
| userMemory = resourceMessage.freeMemory.MB, |
| busyMemory = Some(resourceMessage.busyMemory.MB), |
| tags = resourceMessage.tags, |
| dedicatedNamespaces = resourceMessage.dedicatedNamespaces) |
| |
| InvokerHealth(invoker, status) |
| } |
| .getOrElse(InvokerHealth(InvokerInstanceId(kv.getKey, userMemory = 0.MB), Offline)) |
| } |
| .filter(i => i.status.isUsable) |
| .filter(_.id.userMemory >= minMemory) |
| .filter { invoker => |
| invoker.id.dedicatedNamespaces.isEmpty || invoker.id.dedicatedNamespaces.contains(invocationNamespace) |
| } |
| .toList |
| } |
| } |
| |
| protected[scheduler] def getAvailableInvokers(etcd: EtcdClient, minMemory: ByteSize)( |
| implicit executor: ExecutionContext): Future[List[InvokerHealth]] = { |
| etcd |
| .getPrefix(InvokerKeys.prefix) |
| .map { res => |
| res.getKvsList.asScala |
| .map { kv => |
| InvokerResourceMessage |
| .parse(kv.getValue.toString(StandardCharsets.UTF_8)) |
| .map { resourceMessage => |
| val status = resourceMessage.status match { |
| case Healthy.asString => Healthy |
| case Unhealthy.asString => Unhealthy |
| case _ => Offline |
| } |
| |
| val temporalId = InvokerKeys.getInstanceId(kv.getKey.toString(StandardCharsets.UTF_8)) |
| val invoker = temporalId.copy( |
| userMemory = resourceMessage.freeMemory.MB, |
| busyMemory = Some(resourceMessage.busyMemory.MB), |
| tags = resourceMessage.tags, |
| dedicatedNamespaces = resourceMessage.dedicatedNamespaces) |
| InvokerHealth(invoker, status) |
| } |
| .getOrElse(InvokerHealth(InvokerInstanceId(kv.getKey, userMemory = 0.MB), Offline)) |
| } |
| .filter(i => i.status.isUsable) |
| .filter(_.id.userMemory >= minMemory) |
| .toList |
| } |
| } |
| |
| } |
| |
| case class NoCapacityException(msg: String) extends Exception(msg) |
| |
| /** |
| * TODO This needs to be moved to the QueueManager component that will be added later. |
| */ |
| object QueuePool { |
| private val _queuePool = TrieMap[MemoryQueueKey, MemoryQueueValue]() |
| |
| private[scheduler] def get(key: MemoryQueueKey) = _queuePool.get(key) |
| |
| private[scheduler] def put(key: MemoryQueueKey, value: MemoryQueueValue) = _queuePool.put(key, value) |
| |
| private[scheduler] def remove(key: MemoryQueueKey) = _queuePool.remove(key) |
| |
| private[scheduler] def countLeader() = _queuePool.count(_._2.isLeader) |
| |
| private[scheduler] def clear() = _queuePool.clear() |
| |
| private[scheduler] def size = _queuePool.size |
| |
| private[scheduler] def values = _queuePool.values |
| |
| private[scheduler] def keys = _queuePool.keys |
| } |
| case class MemoryQueueKey(invocationNamespace: String, docInfo: DocInfo) |
| case class MemoryQueueValue(queue: ActorRef, isLeader: Boolean) |