| /* |
| * Copyright 2015-2016 IBM Corporation |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package whisk.core.container |
| |
| import java.nio.file.Files |
| import java.nio.file.Paths |
| import java.util.concurrent.locks.ReentrantLock |
| import java.util.Timer |
| import java.util.TimerTask |
| import java.util.concurrent.ConcurrentLinkedQueue |
| import scala.collection.concurrent.TrieMap |
| import scala.collection.mutable.ListBuffer |
| import scala.concurrent.duration._ |
| import scala.annotation.tailrec |
| import akka.actor.ActorSystem |
| import whisk.common.Counter |
| import whisk.common.Logging |
| import whisk.common.TimingUtil |
| import whisk.common.TransactionId |
| import whisk.core.WhiskConfig |
| import whisk.core.WhiskConfig.dockerImageTag |
| import whisk.core.WhiskConfig.invokerContainerNetwork |
| import whisk.core.WhiskConfig.invokerContainerPolicy |
| import whisk.core.WhiskConfig.invokerCoreShare |
| import whisk.core.WhiskConfig.invokerNumCore |
| import whisk.core.WhiskConfig.selfDockerEndpoint |
| import whisk.core.entity.ActionLimits |
| import whisk.core.entity.MemoryLimit |
| import whisk.core.entity.LogLimit |
| import whisk.core.entity.TimeLimit |
| import whisk.core.entity.WhiskAction |
| import whisk.core.entity.WhiskAuth |
| import whisk.core.entity.WhiskAuthStore |
| import whisk.core.entity.WhiskEntityStore |
| import whisk.core.entity.NodeJS6Exec |
| import akka.event.Logging.LogLevel |
| import akka.event.Logging.InfoLevel |
| import whisk.core.entity.BlackBoxExec |
| |
| /** |
| * A thread-safe container pool that internalizes container creation/teardown and allows users |
| * to check out a container. |
| * |
| * Synchronization via "this" is used to maintain integrity of the data structures. |
| * A separate object "gcSync" is used to prevent multiple GC's from occurring. |
| * |
| * TODO: for now supports only one container per key |
| * TODO: for now does not allow concurrent container creation |
| */ |
| class ContainerPool( |
| config: WhiskConfig, |
| invokerInstance: Integer = 0, |
| verbosity: LogLevel = InfoLevel, |
| standalone: Boolean = false)(implicit actorSystem: ActorSystem) |
| extends ContainerUtils { |
| |
| // These must be defined before verbosity is set |
| private val datastore = WhiskEntityStore.datastore(config) |
| private val authStore = WhiskAuthStore.datastore(config) |
| setVerbosity(verbosity) |
| |
| val dockerhost = config.selfDockerEndpoint |
| |
| // Eventually, we will have a more sophisticated warmup strategy that does multiple sizes |
| private val defaultMemoryLimit = MemoryLimit(MemoryLimit.STD_MEMORY) |
| |
| /** |
| * Sets verbosity of this and owned objects. |
| */ |
| override def setVerbosity(level: LogLevel) = { |
| super.setVerbosity(level) |
| datastore.setVerbosity(level) |
| authStore.setVerbosity(level) |
| } |
| |
| /** |
| * Enables GC. |
| */ |
| def enableGC(): Unit = { |
| gcOn = true |
| } |
| |
| /** |
| * Disables GC. If disabled, overrides other flags/methods. |
| */ |
| def disableGC(): Unit = { |
| gcOn = false |
| } |
| |
| /** |
| * Performs a GC immediately of all idle containers, blocking the caller until completed. |
| */ |
| def forceGC()(implicit transid: TransactionId): Unit = { |
| removeAllIdle({ containerInfo => true }) |
| } |
| |
| /* |
| * Getter/Setter for various GC parameters. |
| */ |
| def gcThreshold: FiniteDuration = _gcThreshold |
| def maxIdle: Int = _maxIdle // container count |
| def maxActive: Int = _maxActive // container count |
| def gcThreshold_=(value: FiniteDuration): Unit = _gcThreshold = (Duration.Zero max value) |
| def maxIdle_=(value: Int): Unit = _maxIdle = Math.max(0, value) |
| def maxActive_=(value: Int): Unit = _maxActive = Math.max(0, value) |
| |
| def resetMaxIdle() = _maxIdle = defaultMaxIdle |
| def resetMaxActive() = { |
| _maxActive = ContainerPool.getDefaultMaxActive(config) |
| info(this, s"maxActive set to ${_maxActive}") |
| } |
| def resetGCThreshold() = _gcThreshold = defaultGCThreshold |
| |
| /* |
| * Controls where docker container logs are put. |
| */ |
| def logDir: String = _logDir |
| def logDir_=(value: String): Unit = _logDir = value |
| |
| /* |
| * How many containers are in the pool at the moment? |
| * There are also counts of containers we are trying to start but have not inserted into the data structure. |
| */ |
| def idleCount() = countByState(State.Idle) |
| def activeCount() = countByState(State.Active) |
| private val startingCounter = new Counter() |
| private var shuttingDown = false |
| |
| /* |
| * Tracks requests for getting containers. |
| * The first value doled out via nextPosition.next() will be 1 and completedPosition.cur remains at 0 until completion. |
| */ |
| private val nextPosition = new Counter() |
| private val completedPosition = new Counter() |
| |
| /* |
| * Lists ALL containers at this docker point with "docker ps -a --no-trunc". |
| * This could include containers not in this pool at all. |
| */ |
| def listAll()(implicit transid: TransactionId): Seq[ContainerState] = listContainers(true) |
| |
| /** |
| * Retrieves (possibly create) a container based on the subject and versioned action. |
| * A flag is included to indicate whether initialization succeeded. |
| * The invariant of returning the container back to the pool holds regardless of whether init succeeded or not. |
| * In case of failure to start a container, None is returned. |
| */ |
| def getAction(action: WhiskAction, auth: WhiskAuth)(implicit transid: TransactionId): Option[(WhiskContainer, Option[RunResult])] = |
| if (shuttingDown) { |
| info(this, s"Shutting down: Not getting container for ${action.fullyQualifiedName} with ${auth.uuid}") |
| None |
| } else { |
| try { |
| val myPos = nextPosition.next() |
| info(this, s"""Getting container for ${action.fullyQualifiedName} of kind ${action.exec.kind} with ${auth.uuid}: |
| | myPos = $myPos |
| | completed = ${completedPosition.cur} |
| | slack = ${slack()} |
| | startingCounter = ${startingCounter.cur}""".stripMargin) |
| val key = ActionContainerId(auth.uuid, action.fullyQualifiedName, action.rev) |
| getImpl(1, myPos, key, () => makeWhiskContainer(action, auth)) map { |
| case (c, initResult) => |
| val cacheMsg = if (!initResult.isDefined) "(Cache Hit)" else "(Cache Miss)" |
| (c.asInstanceOf[WhiskContainer], initResult) |
| } |
| } finally { |
| completedPosition.next() |
| } |
| } |
| |
| /* |
| * For testing |
| */ |
| def getByImageName(imageName: String, args: Array[String])(implicit transid: TransactionId): Option[Container] = { |
| info(this, s"Getting container for image $imageName with args " + args.mkString(" ")) |
| // Not a regular key. Doesn't matter in testing. |
| val key = new ActionContainerId(s"instantiated." + imageName + args.mkString("_")) |
| getImpl(1, 0, key, () => makeContainer(key, imageName, args)) map { _._1 } |
| } |
| |
| /** |
| * Tries to get/create a container via the thunk by delegating to getOrMake. |
| * This method will apply retry so that the caller is blocked until retry succeeds. |
| */ |
| @tailrec |
| final def getImpl(tryCount: Int, position: Int, key: ActionContainerId, conMaker: () => FinalContainerResult)(implicit transid: TransactionId): Option[(Container, Option[RunResult])] = { |
| val positionInLine = position - completedPosition.cur // this will be 1 if at the front of the line |
| val available = slack() |
| if (tryCount % 100 == 0) { |
| warn(this, s"""getImpl possibly stuck because still in line: |
| | position = $position |
| | completed = ${completedPosition.cur} |
| | slack = $available |
| | maxActive = ${_maxActive} |
| | activeCount = ${activeCount()} |
| | startingCounter = ${startingCounter.cur}""".stripMargin) |
| } |
| if (positionInLine > available) { // e.g. if there is 1 available, then I wait if I am second in line (positionInLine = 2) |
| Thread.sleep(50) // TODO: replace with wait/notify but tricky to get right because of desire for maximal concurrency |
| } else getOrMake(key, conMaker) match { |
| case Success(con, initResult) => |
| info(this, s"Obtained container ${con.containerId.id}") |
| return Some(con, initResult) |
| case Error(str) => |
| error(this, s"Error starting container: $str") |
| return None |
| case Busy => |
| // This will not cause a busy loop because only those that could be productive will get a chance |
| } |
| getImpl(tryCount + 1, position, key, conMaker) |
| } |
| |
| def getNumberOfIdleContainers(key: ActionContainerId)(implicit transid: TransactionId): Int = { |
| this.synchronized { |
| keyMap.get(key) map { bucket => bucket.count { _.isIdle() } } getOrElse 0 |
| } |
| } |
| |
| /* |
| * How many containers can we start? Someone could have fully started a container so we must include startingCounter. |
| * The use of a method rather than a getter is meant to signify the synchronization in the implementation. |
| */ |
| private def slack() = _maxActive - (activeCount() + startingCounter.cur) |
| |
| /* |
| * Try to get or create a container, returning None if there are too many |
| * active containers. |
| * |
| * The multiple synchronization block, and the use of startingCounter, |
| * is needed to make sure container count is accurately tracked, |
| * data structure maintains integrity, but to keep all length operations |
| * outside of the lock. |
| * |
| * The returned container will be active (not pause). |
| */ |
| def getOrMake(key: ActionContainerId, conMaker: () => FinalContainerResult)(implicit transid: TransactionId): FinalContainerResult = { |
| retrieve(key) match { |
| case CacheMiss => { |
| conMaker() match { /* We make the container outside synchronization */ |
| // Unfortunately, variables are not allowed in pattern alternatives even when the types line up. |
| case res @ Success(con, initResult) => |
| this.synchronized { |
| val ci = introduceContainer(key, con) |
| ci.state = State.Active |
| res |
| } |
| case res @ Error(_) => res |
| case Busy => |
| assert(false) |
| null // conMaker only returns Success or Error |
| } |
| } |
| case s @ Success(con, initResult) => |
| con.transid = transid |
| runDockerOp { con.unpause() } |
| s |
| case b @ Busy => b |
| case e @ Error(_) => e |
| } |
| } |
| |
| /** |
| * Obtains a pre-existing container from the pool - and putting it to Active state but without docker unpausing. |
| * If we are over capacity, signal Busy. |
| * If it does not exist ready to do, indicate a miss. |
| */ |
| def retrieve(key: ActionContainerId)(implicit transid: TransactionId): ContainerResult = { |
| this.synchronized { |
| // first check if there is a matching container and only if there aren't any |
| // determine if the pool is full or has capacity to accommodate a new container; |
| // this allows any new containers introduced into the pool to be reused if already idle |
| val bucket = keyMap.getOrElseUpdate(key, new ListBuffer()) |
| bucket.find({ ci => ci.isIdle() }) match { |
| case None => |
| if (activeCount() + startingCounter.cur >= _maxActive) { |
| Busy |
| } else { |
| CacheMiss |
| } |
| case Some(ci) => { |
| ci.state = State.Active |
| Success(ci.container, None) |
| } |
| } |
| } |
| } |
| |
| /** |
| * Moves a container from one bucket (i.e. key) to a different one. |
| * This operation is performed when we specialize a pre-warmed container to an action. |
| * ContainerMap does not need to be updated as the Container <-> ContainerInfo relationship does not change. |
| */ |
| def changeKey(ci: ContainerInfo, oldKey: ActionContainerId, newKey: ActionContainerId)(implicit transid: TransactionId) = { |
| this.synchronized { |
| assert(ci.state == State.Active) |
| assert(keyMap.contains(oldKey)) |
| val oldBucket = keyMap(oldKey) |
| val newBucket = keyMap.getOrElseUpdate(newKey, new ListBuffer()) |
| oldBucket -= ci |
| newBucket += ci |
| } |
| } |
| |
| /** |
| * Returns the container to the pool or delete altogether. |
| * This call can be slow but not while locking data structure so it does not interfere with other activations. |
| */ |
| def putBack(container: Container, delete: Boolean = false)(implicit transid: TransactionId): Unit = { |
| info(this, s"""putBack returning container ${container.id} |
| | delete = $delete |
| | completed = ${completedPosition.cur} |
| | slack = ${slack()} |
| | maxActive = ${_maxActive} |
| | activeCount = ${activeCount()} |
| | startingCounter = ${startingCounter.cur}""".stripMargin) |
| // Docker operation outside sync block. Don't pause if we are deleting. |
| if (!delete) { |
| runDockerOp { |
| // pausing eagerly is pessimal; there could be an action waiting |
| // that will immediately unpause the same container to reuse it; |
| // to skip pausing, will need to inspect the queue of waiting activations |
| // for a matching key |
| container.pause() |
| } |
| } |
| |
| val toBeDeleted = this.synchronized { // Return container to pool logically and then optionally delete |
| // Always put back logically for consistency |
| val Some(ci) = containerMap.get(container) |
| assert(ci.state == State.Active) |
| ci.lastUsed = System.currentTimeMillis() |
| ci.state = State.Idle |
| val toBeDeleted = if (delete) { |
| removeContainerInfo(ci) // no docker operation here |
| List(ci) |
| } else { |
| List() |
| } |
| this.notify() |
| toBeDeleted |
| } |
| |
| toBeDeleted.foreach(toBeRemoved.offer(_)) |
| // Perform capacity-based GC here. |
| if (gcOn) { // Synchronization occurs inside calls in a fine-grained manner. |
| while (idleCount() > _maxIdle) { // it is safe for this to be non-atomic with body |
| removeOldestIdle() |
| } |
| } |
| } |
| |
| // ------------------------------------------------------------------------------------------------------------ |
| |
| object State extends Enumeration { |
| val Idle, Active = Value |
| } |
| |
| /** |
| * Wraps a Container to allow a ContainerPool-specific information. |
| */ |
| class ContainerInfo(k: ActionContainerId, con: Container) { |
| val key = k |
| val container = con |
| var state = State.Idle |
| var lastUsed = System.currentTimeMillis() |
| def isIdle() = state == State.Idle |
| } |
| |
| private val containerMap = new TrieMap[Container, ContainerInfo] |
| private val keyMap = new TrieMap[ActionContainerId, ListBuffer[ContainerInfo]] |
| |
| // These are containers that are already removed from the data structure waiting to be docker-removed |
| private val toBeRemoved = new ConcurrentLinkedQueue[ContainerInfo]() |
| |
| // Note that the prefix separates the name space of this from regular keys. |
| // TODO: Generalize across language by storing image name when we generalize to other languages |
| // Better heuristic for # of containers to keep warm - make sensitive to idle capacity |
| private val warmNodejsKey = WarmNodeJsActionContainerId |
| private val nodejsExec = NodeJS6Exec("", None) |
| private val WARM_NODEJS_CONTAINERS = 2 |
| |
| private def keyMapToString(): String = { |
| keyMap.map(p => s"[${p._1.stringRepr} -> ${p._2}]").mkString(" ") |
| } |
| |
| // Easier to walk containerMap than keyMap |
| private def countByState(state: State.Value) = this.synchronized { containerMap.count({ case (_, ci) => ci.state == state }) } |
| |
| // Sample container name: wsk1_1_joeibmcomhelloWorldDemo_20150901T202701852Z |
| private def makeContainerName(localName: String): ContainerName = |
| ContainerCounter.containerName(invokerInstance.toString(), localName) |
| |
| private def makeContainerName(action: WhiskAction): ContainerName = |
| makeContainerName(action.fullyQualifiedName) |
| |
| /** |
| * dockerLock is a fair lock used to serialize all docker operations except pull. |
| * However, a non-pull operation can run concurrently with a pull operation. |
| */ |
| val dockerLock = new ReentrantLock(true) |
| |
| /** |
| * dockerPullLock is used to serialize all pull operations. |
| */ |
| val dockerPullLock = new ReentrantLock(true) |
| |
| /* A background thread that |
| * 1. Kills leftover action containers on startup |
| * 2. Periodically re-populates the container pool with fresh (un-instantiated) nodejs containers. |
| * 3. Periodically tears down containers that have logically been removed from the system |
| */ |
| private def nannyThread(allContainers: Seq[ContainerState]) = new Thread { |
| override def run { |
| implicit val tid = TransactionId.invokerWarmup |
| if (!standalone) killStragglers(allContainers) |
| while (true) { |
| Thread.sleep(100) // serves to prevent busy looping |
| // create a new stem cell if the number of warm containers is less than the count allowed |
| // as long as there is slack so that any actions that may be waiting to create a container |
| // are not held back; Note since this method is not fully synchronized, it is possible to |
| // start this operation while there is slack and end up waiting on the docker lock later |
| if (!standalone && getNumberOfIdleContainers(warmNodejsKey) < WARM_NODEJS_CONTAINERS && slack() > 0) { |
| makeWarmNodejsContainer()(tid) |
| } |
| // We grab the size first so we know there has been enough delay for anything we are shutting down |
| val size = toBeRemoved.size() |
| 1 to size foreach { _ => |
| val ci = toBeRemoved.poll() |
| if (ci != null) { // should never happen but defensive |
| Thread.sleep(100) // serves to not hog docker lock and add slack |
| teardownContainer(ci.container) |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * Gracefully terminates by shutting down containers upon SIGTERM. |
| * If one desires to kill the invoker without this, send it SIGKILL. |
| */ |
| private def shutdown() = { |
| implicit val id = TransactionId.invokerWarmup |
| shuttingDown = true |
| killStragglers(listAll()) |
| } |
| |
| /** |
| * All docker operations from the pool must pass through here (except for pull). |
| */ |
| private def runDockerOp[T](dockerOp: => T)(implicit transid: TransactionId): T = { |
| runDockerOpWithLock(dockerLock, dockerOp) |
| } |
| |
| /** |
| * All docker pull operations from the pool must pass through here. |
| */ |
| private def runDockerPull[T](dockerOp: => T)(implicit transid: TransactionId): T = { |
| runDockerOpWithLock(dockerPullLock, dockerOp) |
| } |
| |
| /** |
| * All docker operations from the pool must pass through here (except for pull). |
| */ |
| private def runDockerOpWithLock[T](lock: ReentrantLock, dockerOp: => T)(implicit transid: TransactionId): T = { |
| lock.lock() |
| try { |
| val (elapsed, result) = TimingUtil.time { dockerOp } |
| if (elapsed > slowDockerThreshold) { |
| warn(this, s"Docker operation took $elapsed") |
| } |
| result |
| } finally { |
| lock.unlock() |
| } |
| } |
| |
| private def makeWarmNodejsContainer()(implicit transid: TransactionId): WhiskContainer = { |
| val imageName = WhiskAction.containerImageName(nodejsExec, config.dockerRegistry, config.dockerImagePrefix, config.dockerImageTag) |
| val limits = ActionLimits(TimeLimit(), defaultMemoryLimit, LogLimit()) |
| val containerName = makeContainerName("warmJsContainer") |
| |
| info(this, "Starting warm nodejs container") |
| |
| val con = makeGeneralContainer(warmNodejsKey, containerName, imageName, limits, false) |
| |
| this.synchronized { |
| introduceContainer(warmNodejsKey, con) |
| } |
| info(this, "Started warm nodejs container") |
| con |
| } |
| |
| private def getWarmNodejsContainer(key: ActionContainerId)(implicit transid: TransactionId): Option[WhiskContainer] = |
| retrieve(warmNodejsKey) match { |
| case Success(con, _) => |
| info(this, s"Obtained a pre-warmed container") |
| con.transid = transid |
| val Some(ci) = containerMap.get(con) |
| changeKey(ci, warmNodejsKey, key) |
| Some(con.asInstanceOf[WhiskContainer]) |
| case _ => None |
| } |
| |
| // Obtain a container (by creation or promotion) and initialize by sending code. |
| private def makeWhiskContainer(action: WhiskAction, auth: WhiskAuth)(implicit transid: TransactionId): FinalContainerResult = { |
| val imageName = getDockerImageName(action) |
| val limits = action.limits |
| val nodeImageName = WhiskAction.containerImageName(nodejsExec, config.dockerRegistry, config.dockerImagePrefix, config.dockerImageTag) |
| val key = ActionContainerId(auth.uuid, action.fullyQualifiedName, action.rev) |
| val warmedContainer = if (limits.memory == defaultMemoryLimit && imageName == nodeImageName) getWarmNodejsContainer(key) else None |
| val containerName = makeContainerName(action) |
| val con = warmedContainer getOrElse { |
| try { |
| info(this, s"making new container because none available") |
| startingCounter.next() |
| makeGeneralContainer(key, containerName, imageName, limits, action.exec.kind == BlackBoxExec) |
| } finally { |
| val newCount = startingCounter.prev() |
| info(this, s"made container, number of remaining containers to start: $newCount") |
| } |
| } |
| initWhiskContainer(action, con) |
| } |
| |
| // Make a container somewhat generically without introducing into data structure. |
| // There is access to global settings (docker registry) |
| // and generic settings (image name - static limits) but without access to WhiskAction. |
| private def makeGeneralContainer( |
| key: ActionContainerId, containerName: ContainerName, |
| imageName: String, limits: ActionLimits, pull: Boolean)( |
| implicit transid: TransactionId): WhiskContainer = { |
| |
| val network = config.invokerContainerNetwork |
| val cpuShare = ContainerPool.cpuShare(config) |
| val policy = config.invokerContainerPolicy |
| val env = getContainerEnvironment() |
| // This will start up the container |
| if (pull) runDockerPull { |
| ContainerUtils.pullImage(dockerhost, imageName) |
| } |
| runDockerOp { |
| // because of the docker lock, by the time the container gets around to be started |
| // there could be a container to reuse (from a previous run of the same action, or |
| // from a stem cell container); should revisit this logic |
| new WhiskContainer(transid, this.dockerhost, key, containerName, imageName, network, cpuShare, policy, env, limits, isBlackbox = pull, logLevel = this.getVerbosity()) |
| } |
| } |
| |
| // We send the payload here but eventually must also handle morphing a pre-allocated container into the right state. |
| private def initWhiskContainer(action: WhiskAction, con: WhiskContainer)(implicit transid: TransactionId): FinalContainerResult = { |
| con.boundParams = action.parameters.toJsObject |
| |
| // Then send it the init payload which is code for now |
| val initArg = action.containerInitializer |
| val initResult = con.init(initArg, action.limits.timeout.duration) |
| Success(con, Some(initResult)) |
| } |
| |
| /** |
| * Used through testing only. Creates a container running the command in `args`. |
| */ |
| private def makeContainer(key: ActionContainerId, imageName: String, args: Array[String])(implicit transid: TransactionId): FinalContainerResult = { |
| val con = runDockerOp { |
| new Container(transid, this.dockerhost, key, None, imageName, |
| config.invokerContainerNetwork, ContainerPool.cpuShare(config), |
| config.invokerContainerPolicy, ActionLimits(), Map(), args, |
| this.getVerbosity()) |
| } |
| con.setVerbosity(getVerbosity()) |
| Success(con, None) |
| } |
| |
| /** |
| * Adds the container into the data structure in an Idle state. |
| * The caller must have synchronized to maintain data structure atomicity. |
| */ |
| private def introduceContainer(key: ActionContainerId, container: Container)(implicit transid: TransactionId): ContainerInfo = { |
| val ci = new ContainerInfo(key, container) |
| keyMap.getOrElseUpdate(key, ListBuffer()) += ci |
| containerMap += container -> ci |
| dumpState("introduceContainer") |
| ci |
| } |
| |
| private def dumpState(prefix: String)(implicit transid: TransactionId) = { |
| debug(this, s"$prefix: keyMap = ${keyMapToString()}") |
| } |
| |
| private def getDockerImageName(action: WhiskAction)(implicit transid: TransactionId): String = { |
| val imageName = action.containerImageName(config.dockerRegistry, config.dockerImagePrefix, config.dockerImageTag) |
| debug(this, s"Using image ${imageName}") |
| imageName |
| } |
| |
| private def getContainerEnvironment(): Map[String, String] = { |
| Map(WhiskConfig.asEnvVar(WhiskConfig.edgeHostName) -> config.edgeHost) |
| } |
| |
| private val defaultMaxIdle = 10 |
| private val defaultGCThreshold = 600.seconds |
| private val slowDockerThreshold = 500.millis |
| private val slowDockerPullThreshold = 5.seconds |
| |
| val gcFrequency = 1000.milliseconds // this should not be leaked but a test needs this until GC count is implemented |
| private var _maxIdle = defaultMaxIdle |
| private var _maxActive = 0 |
| private var _gcThreshold = defaultGCThreshold |
| private var gcOn = true |
| private val gcSync = new Object() |
| resetMaxActive() |
| |
| private val timer = new Timer() |
| private val gcTask = new TimerTask { |
| def run() { |
| performGC()(TransactionId.invoker) |
| } |
| } |
| timer.scheduleAtFixedRate(gcTask, 0, gcFrequency.toMillis) |
| |
| /** |
| * Removes all idle containers older than the threshold. |
| */ |
| private def performGC()(implicit transid: TransactionId) = { |
| val expiration = System.currentTimeMillis() - gcThreshold.toMillis |
| removeAllIdle({ containerInfo => containerInfo.lastUsed <= expiration }) |
| dumpState("performGC") |
| } |
| |
| /** |
| * Collects all containers that are in the idle state and pass the predicate. |
| * gcSync is used to prevent multiple GC's. |
| */ |
| private def removeAllIdle(pred: ContainerInfo => Boolean)(implicit transid: TransactionId) = { |
| gcSync.synchronized { |
| val idleInfo = this.synchronized { |
| val idle = containerMap filter { case (container, ci) => ci.isIdle() && pred(ci) } |
| idle.keys foreach { con => |
| info(this, s"removeAllIdle removing container ${con.id}") |
| } |
| containerMap --= idle.keys |
| keyMap foreach { case (key, ciList) => ciList --= idle.values } |
| keyMap retain { case (key, ciList) => !ciList.isEmpty } |
| idle.values |
| } |
| idleInfo.foreach(toBeRemoved.offer(_)) |
| } |
| } |
| |
| // Remove containerInfo from data structures but does not perform actual container operation. |
| // Caller must establish synchronization |
| private def removeContainerInfo(conInfo: ContainerInfo)(implicit transid: TransactionId) = { |
| containerMap -= conInfo.container |
| keyMap foreach { case (key, ciList) => ciList -= conInfo } |
| keyMap retain { case (key, ciList) => !ciList.isEmpty } |
| } |
| |
| private def removeOldestIdle()(implicit transid: TransactionId) = { |
| // Note that the container removal - if any - is done outside the synchronized block |
| val oldestIdle = this.synchronized { |
| val idle = (containerMap filter { case (container, ci) => ci.isIdle() }) |
| if (idle.isEmpty) |
| List() |
| else { |
| val oldestConInfo = idle.minBy(_._2.lastUsed)._2 |
| info(this, s"removeOldestIdle removing container ${oldestConInfo.container.id}") |
| removeContainerInfo(oldestConInfo) |
| List(oldestConInfo) |
| } |
| } |
| oldestIdle.foreach(toBeRemoved.offer(_)) |
| } |
| |
| // Getter/setter for this are above. |
| private var _logDir = "/logs" |
| private val actionContainerPrefix = "wsk" |
| |
| /** |
| * Actually deletes the containers. |
| */ |
| private def teardownContainer(container: Container)(implicit transid: TransactionId) = { |
| val size = this.getLogSize(container, !standalone) |
| val rawLogBytes = container.synchronized { |
| this.getDockerLogContent(container.containerId, 0, size, !standalone) |
| } |
| val filename = s"${_logDir}/${container.name}.log" |
| Files.write(Paths.get(filename), rawLogBytes) |
| info(this, s"teardownContainers: wrote docker logs to $filename") |
| runDockerOp { container.remove() } |
| } |
| |
| /** |
| * Removes all containers with the actionContainerPrefix to kill leftover action containers. |
| * This is needed for startup and shutdown. |
| * Concurrent access from clients must be prevented. |
| */ |
| private def killStragglers(allContainers: Seq[ContainerState])(implicit transid: TransactionId) = { |
| val candidates = allContainers.filter { |
| _.name.name.startsWith(actionContainerPrefix) |
| } |
| |
| info(this, s"Now removing ${candidates.length} leftover containers") |
| |
| candidates foreach { c => |
| unpauseContainer(c.name) |
| rmContainer(c.name) |
| } |
| |
| info(this, s"Leftover container removal completed") |
| } |
| |
| /** |
| * Gets the size of the mounted file associated with this whisk container. |
| */ |
| def getLogSize(con: Container, mounted: Boolean)(implicit transid: TransactionId): Long = { |
| getDockerLogSize(con.containerId, mounted) |
| } |
| |
| nannyThread(listAll()(TransactionId.invokerWarmup)).start |
| if (!standalone) { |
| sys addShutdownHook { |
| warn(this, "Shutdown hook activated. Starting container shutdown") |
| shutdown() |
| warn(this, "Shutdown hook completed.") |
| } |
| } |
| |
| } |
| |
| /* |
| * These methods are parameterized on the configuration but defined here as an instance of ContainerPool is not |
| * always available from other call sites. |
| */ |
| object ContainerPool extends Logging { |
| def requiredProperties = Map( |
| selfDockerEndpoint -> "localhost", |
| dockerImageTag -> "latest", |
| invokerContainerNetwork -> "bridge", |
| invokerNumCore -> "4", |
| invokerCoreShare -> "2", |
| invokerContainerPolicy -> "") |
| |
| /* |
| * Extract parameters from whisk config. In the future, these may not be static but |
| * dynamically updated. They still serve as a starting point for downstream parameters. |
| */ |
| def numCore(config: WhiskConfig) = config.invokerNumCore.toInt |
| def shareFactor(config: WhiskConfig) = config.invokerCoreShare.toInt |
| |
| /* |
| * The total number of containers is simply the number of cores dilated by the cpu sharing. |
| */ |
| def getDefaultMaxActive(config: WhiskConfig) = numCore(config) * shareFactor(config) |
| |
| /* The shareFactor indicates the number of containers that would share a single core, on average. |
| * cpuShare is a docker option (-c) whereby a container's CPU access is limited. |
| * A value of 1024 is the full share so a strict resource division with a shareFactor of 2 would yield 512. |
| * On an idle/underloaded system, a container will still get to use underutilized CPU shares. |
| */ |
| private val totalShare = 1024.0 // This is a pre-defined value coming from docker and not our hard-coded value. |
| def cpuShare(config: WhiskConfig) = (totalShare / getDefaultMaxActive(config)).toInt |
| |
| } |