Rename dispatcher to invoker.
Fixes #1117.
diff --git a/core/dispatcher/Dockerfile b/core/dispatcher/Dockerfile
deleted file mode 100644
index 8f4472c..0000000
--- a/core/dispatcher/Dockerfile
+++ /dev/null
@@ -1,11 +0,0 @@
-FROM scala
-
-# Uncomment to fetch latest version of docker instead: RUN wget -qO- https://get.docker.com | sh
-# Install docker 1.9.0
-RUN wget --no-verbose -O /usr/bin/docker "https://get.docker.com/builds/Linux/x86_64/docker-1.9.0" && \
-chmod +x /usr/bin/docker
-
-COPY build/distributions/dispatcher.tar ./
-RUN tar xf dispatcher.tar
-
-EXPOSE 8080
diff --git a/core/dispatcher/build.gradle b/core/dispatcher/build.gradle
deleted file mode 100644
index c3251aa..0000000
--- a/core/dispatcher/build.gradle
+++ /dev/null
@@ -1,22 +0,0 @@
-apply plugin: 'scala'
-apply plugin: 'application'
-apply plugin: 'eclipse'
-
-ext.dockerImageName = 'dispatcher'
-apply from: '../../gradle/docker.gradle'
-distDocker.dependsOn ':common:scala:distDocker', 'distTar'
-
-repositories {
- mavenCentral()
-}
-
-dependencies {
- compile "org.scala-lang:scala-library:${gradle.scala.version}"
- compile project(':common:scala')
-}
-
-tasks.withType(ScalaCompile) {
- scalaCompileOptions.additionalParameters = gradle.scala.compileFlags
-}
-
-mainClassName = "whisk.core.dispatcher.Dispatcher"
diff --git a/core/dispatcher/src/main/resources/application.conf b/core/dispatcher/src/main/resources/application.conf
deleted file mode 100644
index 688d81c..0000000
--- a/core/dispatcher/src/main/resources/application.conf
+++ /dev/null
@@ -1,7 +0,0 @@
-# common logging configuration see common scala
-include "logging"
-
-# see http://spray.io/documentation/1.2.2/spray-can/configuration/
-spray.can.server {
- request-timeout = infinite
-}
\ No newline at end of file
diff --git a/core/dispatcher/src/main/scala/whisk/core/container/Container.scala b/core/dispatcher/src/main/scala/whisk/core/container/Container.scala
deleted file mode 100644
index a7e64ab..0000000
--- a/core/dispatcher/src/main/scala/whisk/core/container/Container.scala
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Copyright 2015-2016 IBM Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.container
-
-import scala.annotation.tailrec
-
-import akka.event.Logging.LogLevel
-
-import whisk.core.WhiskConfig.selfDockerEndpoint
-import whisk.core.WhiskConfig.invokerContainerNetwork
-import whisk.core.entity.ActionLimits
-import whisk.common.TransactionId
-import whisk.common.Counter
-
-/**
- * Reifies a docker container.
- */
-class Container(
- originalId: TransactionId,
- val dockerhost: String,
- val key: ActionContainerId,
- containerName: Option[ContainerName],
- val image: String,
- network: String,
- cpuShare: Int,
- policy: Option[String],
- val limits: ActionLimits = ActionLimits(),
- env: Map[String, String] = Map(),
- args: Array[String] = Array(),
- logLevel: LogLevel)
- extends ContainerUtils {
-
- setVerbosity(logLevel)
-
- implicit var transid = originalId
-
- val id = Container.idCounter.next()
- val name = containerName.getOrElse("anon")
-
- val (containerId, containerHostAndPort) = bringup(containerName, image, network, cpuShare, env, args, limits, policy)
-
- def details: String = {
- val name = containerName getOrElse "??"
- val id = containerId.id
- val ip = containerHostAndPort getOrElse "??"
- s"container [$name] [$id] [$ip]"
- }
-
- def pause(): Unit = pauseContainer(containerId)
-
- def unpause(): Unit = unpauseContainer(containerId)
-
- /**
- * A prefix of the container id known to be displayed by docker ps.
- */
- lazy val containerIdPrefix: String = {
- // docker ps contains only a prefix of the id
- containerId.id.take(8)
- }
-
- /**
- * Gets logs for container.
- */
- def getLogs()(implicit transid: TransactionId): String = {
- getContainerLogs(containerId).toOption getOrElse ""
- }
-
- /**
- * Unpauses and removes a container (it may be running).
- */
- @tailrec
- final def remove(tryCount: Int = Container.removeContainerRetryCount)(implicit transid: TransactionId): Unit = {
- if (tryCount <= 0) {
- error(this, s"Failed to remove container $containerId")
- } else {
- if (tryCount == Container.removeContainerRetryCount) {
- info(this, s"Removing container $containerId")
- } else {
- warn(this, s"Retrying to remove container $containerId")
- }
- unpause() // a paused container cannot be removed
- rmContainer(containerId).toOption match {
- case None => remove(tryCount - 1)
- case _ => ()
- }
- }
- }
-}
-
-object Container {
- def requiredProperties = Map(selfDockerEndpoint -> null, invokerContainerNetwork -> "bridge")
- private val idCounter = new Counter()
- private val removeContainerRetryCount = 2
-}
diff --git a/core/dispatcher/src/main/scala/whisk/core/container/ContainerPool.scala b/core/dispatcher/src/main/scala/whisk/core/container/ContainerPool.scala
deleted file mode 100644
index dfc9da7..0000000
--- a/core/dispatcher/src/main/scala/whisk/core/container/ContainerPool.scala
+++ /dev/null
@@ -1,777 +0,0 @@
-/*
- * Copyright 2015-2016 IBM Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.container
-
-import java.nio.file.Files
-import java.nio.file.Paths
-import java.util.concurrent.locks.ReentrantLock
-import java.util.Timer
-import java.util.TimerTask
-import java.util.concurrent.ConcurrentLinkedQueue
-import scala.collection.concurrent.TrieMap
-import scala.collection.mutable.ListBuffer
-import scala.concurrent.duration._
-import scala.annotation.tailrec
-import akka.actor.ActorSystem
-import whisk.common.Counter
-import whisk.common.Logging
-import whisk.common.TimingUtil
-import whisk.common.TransactionId
-import whisk.core.WhiskConfig
-import whisk.core.WhiskConfig.dockerImageTag
-import whisk.core.WhiskConfig.invokerContainerNetwork
-import whisk.core.WhiskConfig.invokerContainerPolicy
-import whisk.core.WhiskConfig.invokerCoreShare
-import whisk.core.WhiskConfig.invokerNumCore
-import whisk.core.WhiskConfig.selfDockerEndpoint
-import whisk.core.entity.ActionLimits
-import whisk.core.entity.MemoryLimit
-import whisk.core.entity.LogLimit
-import whisk.core.entity.TimeLimit
-import whisk.core.entity.WhiskAction
-import whisk.core.entity.WhiskAuth
-import whisk.core.entity.WhiskAuthStore
-import whisk.core.entity.WhiskEntityStore
-import whisk.core.entity.NodeJS6Exec
-import akka.event.Logging.LogLevel
-import akka.event.Logging.InfoLevel
-import whisk.core.entity.BlackBoxExec
-
-/**
- * A thread-safe container pool that internalizes container creation/teardown and allows users
- * to check out a container.
- *
- * Synchronization via "this" is used to maintain integrity of the data structures.
- * A separate object "gcSync" is used to prevent multiple GC's from occurring.
- *
- * TODO: for now supports only one container per key
- * TODO: for now does not allow concurrent container creation
- */
-class ContainerPool(
- config: WhiskConfig,
- invokerInstance: Integer = 0,
- verbosity: LogLevel = InfoLevel,
- standalone: Boolean = false)(implicit actorSystem: ActorSystem)
- extends ContainerUtils {
-
- // These must be defined before verbosity is set
- private val datastore = WhiskEntityStore.datastore(config)
- private val authStore = WhiskAuthStore.datastore(config)
- setVerbosity(verbosity)
-
- val dockerhost = config.selfDockerEndpoint
-
- // Eventually, we will have a more sophisticated warmup strategy that does multiple sizes
- private val defaultMemoryLimit = MemoryLimit(MemoryLimit.STD_MEMORY)
-
- /**
- * Sets verbosity of this and owned objects.
- */
- override def setVerbosity(level: LogLevel) = {
- super.setVerbosity(level)
- datastore.setVerbosity(level)
- authStore.setVerbosity(level)
- }
-
- /**
- * Enables GC.
- */
- def enableGC(): Unit = {
- gcOn = true
- }
-
- /**
- * Disables GC. If disabled, overrides other flags/methods.
- */
- def disableGC(): Unit = {
- gcOn = false
- }
-
- /**
- * Performs a GC immediately of all idle containers, blocking the caller until completed.
- */
- def forceGC()(implicit transid: TransactionId): Unit = {
- removeAllIdle({ containerInfo => true })
- }
-
- /*
- * Getter/Setter for various GC parameters.
- */
- def gcThreshold: FiniteDuration = _gcThreshold
- def maxIdle: Int = _maxIdle // container count
- def maxActive: Int = _maxActive // container count
- def gcThreshold_=(value: FiniteDuration): Unit = _gcThreshold = (Duration.Zero max value)
- def maxIdle_=(value: Int): Unit = _maxIdle = Math.max(0, value)
- def maxActive_=(value: Int): Unit = _maxActive = Math.max(0, value)
-
- def resetMaxIdle() = _maxIdle = defaultMaxIdle
- def resetMaxActive() = {
- _maxActive = ContainerPool.getDefaultMaxActive(config)
- info(this, s"maxActive set to ${_maxActive}")
- }
- def resetGCThreshold() = _gcThreshold = defaultGCThreshold
-
- /*
- * Controls where docker container logs are put.
- */
- def logDir: String = _logDir
- def logDir_=(value: String): Unit = _logDir = value
-
- /*
- * How many containers are in the pool at the moment?
- * There are also counts of containers we are trying to start but have not inserted into the data structure.
- */
- def idleCount() = countByState(State.Idle)
- def activeCount() = countByState(State.Active)
- private val startingCounter = new Counter()
- private var shuttingDown = false
-
- /*
- * Tracks requests for getting containers.
- * The first value doled out via nextPosition.next() will be 1 and completedPosition.cur remains at 0 until completion.
- */
- private val nextPosition = new Counter()
- private val completedPosition = new Counter()
-
- /*
- * Lists ALL containers at this docker point with "docker ps -a --no-trunc".
- * This could include containers not in this pool at all.
- */
- def listAll()(implicit transid: TransactionId): Seq[ContainerState] = listContainers(true)
-
- /**
- * Retrieves (possibly create) a container based on the subject and versioned action.
- * A flag is included to indicate whether initialization succeeded.
- * The invariant of returning the container back to the pool holds regardless of whether init succeeded or not.
- * In case of failure to start a container, None is returned.
- */
- def getAction(action: WhiskAction, auth: WhiskAuth)(implicit transid: TransactionId): Option[(WhiskContainer, Option[RunResult])] =
- if (shuttingDown) {
- info(this, s"Shutting down: Not getting container for ${action.fullyQualifiedName} with ${auth.uuid}")
- None
- } else {
- try {
- val myPos = nextPosition.next()
- info(this, s"""Getting container for ${action.fullyQualifiedName} of kind ${action.exec.kind} with ${auth.uuid}:
- | myPos = $myPos
- | completed = ${completedPosition.cur}
- | slack = ${slack()}
- | startingCounter = ${startingCounter.cur}""".stripMargin)
- val key = ActionContainerId(auth.uuid, action.fullyQualifiedName, action.rev)
- getImpl(1, myPos, key, () => makeWhiskContainer(action, auth)) map {
- case (c, initResult) =>
- val cacheMsg = if (!initResult.isDefined) "(Cache Hit)" else "(Cache Miss)"
- (c.asInstanceOf[WhiskContainer], initResult)
- }
- } finally {
- completedPosition.next()
- }
- }
-
- /*
- * For testing
- */
- def getByImageName(imageName: String, args: Array[String])(implicit transid: TransactionId): Option[Container] = {
- info(this, s"Getting container for image $imageName with args " + args.mkString(" "))
- // Not a regular key. Doesn't matter in testing.
- val key = new ActionContainerId(s"instantiated." + imageName + args.mkString("_"))
- getImpl(1, 0, key, () => makeContainer(key, imageName, args)) map { _._1 }
- }
-
- /**
- * Tries to get/create a container via the thunk by delegating to getOrMake.
- * This method will apply retry so that the caller is blocked until retry succeeds.
- */
- @tailrec
- final def getImpl(tryCount: Int, position: Int, key: ActionContainerId, conMaker: () => FinalContainerResult)(implicit transid: TransactionId): Option[(Container, Option[RunResult])] = {
- val positionInLine = position - completedPosition.cur // this will be 1 if at the front of the line
- val available = slack()
- if (tryCount % 100 == 0) {
- warn(this, s"""getImpl possibly stuck because still in line:
- | position = $position
- | completed = ${completedPosition.cur}
- | slack = $available
- | maxActive = ${_maxActive}
- | activeCount = ${activeCount()}
- | startingCounter = ${startingCounter.cur}""".stripMargin)
- }
- if (positionInLine > available) { // e.g. if there is 1 available, then I wait if I am second in line (positionInLine = 2)
- Thread.sleep(50) // TODO: replace with wait/notify but tricky to get right because of desire for maximal concurrency
- } else getOrMake(key, conMaker) match {
- case Success(con, initResult) =>
- info(this, s"Obtained container ${con.containerId.id}")
- return Some(con, initResult)
- case Error(str) =>
- error(this, s"Error starting container: $str")
- return None
- case Busy =>
- // This will not cause a busy loop because only those that could be productive will get a chance
- }
- getImpl(tryCount + 1, position, key, conMaker)
- }
-
- def getNumberOfIdleContainers(key: ActionContainerId)(implicit transid: TransactionId): Int = {
- this.synchronized {
- keyMap.get(key) map { bucket => bucket.count { _.isIdle() } } getOrElse 0
- }
- }
-
- /*
- * How many containers can we start? Someone could have fully started a container so we must include startingCounter.
- * The use of a method rather than a getter is meant to signify the synchronization in the implementation.
- */
- private def slack() = _maxActive - (activeCount() + startingCounter.cur)
-
- /*
- * Try to get or create a container, returning None if there are too many
- * active containers.
- *
- * The multiple synchronization block, and the use of startingCounter,
- * is needed to make sure container count is accurately tracked,
- * data structure maintains integrity, but to keep all length operations
- * outside of the lock.
- *
- * The returned container will be active (not pause).
- */
- def getOrMake(key: ActionContainerId, conMaker: () => FinalContainerResult)(implicit transid: TransactionId): FinalContainerResult = {
- retrieve(key) match {
- case CacheMiss => {
- conMaker() match { /* We make the container outside synchronization */
- // Unfortunately, variables are not allowed in pattern alternatives even when the types line up.
- case res @ Success(con, initResult) =>
- this.synchronized {
- val ci = introduceContainer(key, con)
- ci.state = State.Active
- res
- }
- case res @ Error(_) => res
- case Busy =>
- assert(false)
- null // conMaker only returns Success or Error
- }
- }
- case s @ Success(con, initResult) =>
- con.transid = transid
- runDockerOp { con.unpause() }
- s
- case b @ Busy => b
- case e @ Error(_) => e
- }
- }
-
- /**
- * Obtains a pre-existing container from the pool - and putting it to Active state but without docker unpausing.
- * If we are over capacity, signal Busy.
- * If it does not exist ready to do, indicate a miss.
- */
- def retrieve(key: ActionContainerId)(implicit transid: TransactionId): ContainerResult = {
- this.synchronized {
- // first check if there is a matching container and only if there aren't any
- // determine if the pool is full or has capacity to accommodate a new container;
- // this allows any new containers introduced into the pool to be reused if already idle
- val bucket = keyMap.getOrElseUpdate(key, new ListBuffer())
- bucket.find({ ci => ci.isIdle() }) match {
- case None =>
- if (activeCount() + startingCounter.cur >= _maxActive) {
- Busy
- } else {
- CacheMiss
- }
- case Some(ci) => {
- ci.state = State.Active
- Success(ci.container, None)
- }
- }
- }
- }
-
- /**
- * Moves a container from one bucket (i.e. key) to a different one.
- * This operation is performed when we specialize a pre-warmed container to an action.
- * ContainerMap does not need to be updated as the Container <-> ContainerInfo relationship does not change.
- */
- def changeKey(ci: ContainerInfo, oldKey: ActionContainerId, newKey: ActionContainerId)(implicit transid: TransactionId) = {
- this.synchronized {
- assert(ci.state == State.Active)
- assert(keyMap.contains(oldKey))
- val oldBucket = keyMap(oldKey)
- val newBucket = keyMap.getOrElseUpdate(newKey, new ListBuffer())
- oldBucket -= ci
- newBucket += ci
- }
- }
-
- /**
- * Returns the container to the pool or delete altogether.
- * This call can be slow but not while locking data structure so it does not interfere with other activations.
- */
- def putBack(container: Container, delete: Boolean = false)(implicit transid: TransactionId): Unit = {
- info(this, s"""putBack returning container ${container.id}
- | delete = $delete
- | completed = ${completedPosition.cur}
- | slack = ${slack()}
- | maxActive = ${_maxActive}
- | activeCount = ${activeCount()}
- | startingCounter = ${startingCounter.cur}""".stripMargin)
- // Docker operation outside sync block. Don't pause if we are deleting.
- if (!delete) {
- runDockerOp {
- // pausing eagerly is pessimal; there could be an action waiting
- // that will immediately unpause the same container to reuse it;
- // to skip pausing, will need to inspect the queue of waiting activations
- // for a matching key
- container.pause()
- }
- }
-
- val toBeDeleted = this.synchronized { // Return container to pool logically and then optionally delete
- // Always put back logically for consistency
- val Some(ci) = containerMap.get(container)
- assert(ci.state == State.Active)
- ci.lastUsed = System.currentTimeMillis()
- ci.state = State.Idle
- val toBeDeleted = if (delete) {
- removeContainerInfo(ci) // no docker operation here
- List(ci)
- } else {
- List()
- }
- this.notify()
- toBeDeleted
- }
-
- toBeDeleted.foreach(toBeRemoved.offer(_))
- // Perform capacity-based GC here.
- if (gcOn) { // Synchronization occurs inside calls in a fine-grained manner.
- while (idleCount() > _maxIdle) { // it is safe for this to be non-atomic with body
- removeOldestIdle()
- }
- }
- }
-
- // ------------------------------------------------------------------------------------------------------------
-
- object State extends Enumeration {
- val Idle, Active = Value
- }
-
- /**
- * Wraps a Container to allow a ContainerPool-specific information.
- */
- class ContainerInfo(k: ActionContainerId, con: Container) {
- val key = k
- val container = con
- var state = State.Idle
- var lastUsed = System.currentTimeMillis()
- def isIdle() = state == State.Idle
- }
-
- private val containerMap = new TrieMap[Container, ContainerInfo]
- private val keyMap = new TrieMap[ActionContainerId, ListBuffer[ContainerInfo]]
-
- // These are containers that are already removed from the data structure waiting to be docker-removed
- private val toBeRemoved = new ConcurrentLinkedQueue[ContainerInfo]()
-
- // Note that the prefix separates the name space of this from regular keys.
- // TODO: Generalize across language by storing image name when we generalize to other languages
- // Better heuristic for # of containers to keep warm - make sensitive to idle capacity
- private val warmNodejsKey = WarmNodeJsActionContainerId
- private val nodejsExec = NodeJS6Exec("", None)
- private val WARM_NODEJS_CONTAINERS = 2
-
- private def keyMapToString(): String = {
- keyMap.map(p => s"[${p._1.stringRepr} -> ${p._2}]").mkString(" ")
- }
-
- // Easier to walk containerMap than keyMap
- private def countByState(state: State.Value) = this.synchronized { containerMap.count({ case (_, ci) => ci.state == state }) }
-
- // Sample container name: wsk1_1_joeibmcomhelloWorldDemo_20150901T202701852Z
- private def makeContainerName(localName: String): ContainerName =
- ContainerCounter.containerName(invokerInstance.toString(), localName)
-
- private def makeContainerName(action: WhiskAction): ContainerName =
- makeContainerName(action.fullyQualifiedName)
-
- /**
- * dockerLock is a fair lock used to serialize all docker operations except pull.
- * However, a non-pull operation can run concurrently with a pull operation.
- */
- val dockerLock = new ReentrantLock(true)
-
- /**
- * dockerPullLock is used to serialize all pull operations.
- */
- val dockerPullLock = new ReentrantLock(true)
-
- /* A background thread that
- * 1. Kills leftover action containers on startup
- * 2. Periodically re-populates the container pool with fresh (un-instantiated) nodejs containers.
- * 3. Periodically tears down containers that have logically been removed from the system
- */
- private def nannyThread(allContainers: Seq[ContainerState]) = new Thread {
- override def run {
- implicit val tid = TransactionId.invokerWarmup
- if (!standalone) killStragglers(allContainers)
- while (true) {
- Thread.sleep(100) // serves to prevent busy looping
- // create a new stem cell if the number of warm containers is less than the count allowed
- // as long as there is slack so that any actions that may be waiting to create a container
- // are not held back; Note since this method is not fully synchronized, it is possible to
- // start this operation while there is slack and end up waiting on the docker lock later
- if (!standalone && getNumberOfIdleContainers(warmNodejsKey) < WARM_NODEJS_CONTAINERS && slack() > 0) {
- makeWarmNodejsContainer()(tid)
- }
- // We grab the size first so we know there has been enough delay for anything we are shutting down
- val size = toBeRemoved.size()
- 1 to size foreach { _ =>
- val ci = toBeRemoved.poll()
- if (ci != null) { // should never happen but defensive
- Thread.sleep(100) // serves to not hog docker lock and add slack
- teardownContainer(ci.container)
- }
- }
- }
- }
- }
-
- /**
- * Gracefully terminates by shutting down containers upon SIGTERM.
- * If one desires to kill the invoker without this, send it SIGKILL.
- */
- private def shutdown() = {
- implicit val id = TransactionId.invokerWarmup
- shuttingDown = true
- killStragglers(listAll())
- }
-
- /**
- * All docker operations from the pool must pass through here (except for pull).
- */
- private def runDockerOp[T](dockerOp: => T)(implicit transid: TransactionId): T = {
- runDockerOpWithLock(dockerLock, dockerOp)
- }
-
- /**
- * All docker pull operations from the pool must pass through here.
- */
- private def runDockerPull[T](dockerOp: => T)(implicit transid: TransactionId): T = {
- runDockerOpWithLock(dockerPullLock, dockerOp)
- }
-
- /**
- * All docker operations from the pool must pass through here (except for pull).
- */
- private def runDockerOpWithLock[T](lock: ReentrantLock, dockerOp: => T)(implicit transid: TransactionId): T = {
- lock.lock()
- try {
- val (elapsed, result) = TimingUtil.time { dockerOp }
- if (elapsed > slowDockerThreshold) {
- warn(this, s"Docker operation took $elapsed")
- }
- result
- } finally {
- lock.unlock()
- }
- }
-
- private def makeWarmNodejsContainer()(implicit transid: TransactionId): WhiskContainer = {
- val imageName = WhiskAction.containerImageName(nodejsExec, config.dockerRegistry, config.dockerImagePrefix, config.dockerImageTag)
- val limits = ActionLimits(TimeLimit(), defaultMemoryLimit, LogLimit())
- val containerName = makeContainerName("warmJsContainer")
-
- info(this, "Starting warm nodejs container")
-
- val con = makeGeneralContainer(warmNodejsKey, containerName, imageName, limits, false)
-
- this.synchronized {
- introduceContainer(warmNodejsKey, con)
- }
- info(this, "Started warm nodejs container")
- con
- }
-
- private def getWarmNodejsContainer(key: ActionContainerId)(implicit transid: TransactionId): Option[WhiskContainer] =
- retrieve(warmNodejsKey) match {
- case Success(con, _) =>
- info(this, s"Obtained a pre-warmed container")
- con.transid = transid
- val Some(ci) = containerMap.get(con)
- changeKey(ci, warmNodejsKey, key)
- Some(con.asInstanceOf[WhiskContainer])
- case _ => None
- }
-
- // Obtain a container (by creation or promotion) and initialize by sending code.
- private def makeWhiskContainer(action: WhiskAction, auth: WhiskAuth)(implicit transid: TransactionId): FinalContainerResult = {
- val imageName = getDockerImageName(action)
- val limits = action.limits
- val nodeImageName = WhiskAction.containerImageName(nodejsExec, config.dockerRegistry, config.dockerImagePrefix, config.dockerImageTag)
- val key = ActionContainerId(auth.uuid, action.fullyQualifiedName, action.rev)
- val warmedContainer = if (limits.memory == defaultMemoryLimit && imageName == nodeImageName) getWarmNodejsContainer(key) else None
- val containerName = makeContainerName(action)
- val con = warmedContainer getOrElse {
- try {
- info(this, s"making new container because none available")
- startingCounter.next()
- makeGeneralContainer(key, containerName, imageName, limits, action.exec.kind == BlackBoxExec)
- } finally {
- val newCount = startingCounter.prev()
- info(this, s"made container, number of remaining containers to start: $newCount")
- }
- }
- initWhiskContainer(action, con)
- }
-
- // Make a container somewhat generically without introducing into data structure.
- // There is access to global settings (docker registry)
- // and generic settings (image name - static limits) but without access to WhiskAction.
- private def makeGeneralContainer(
- key: ActionContainerId, containerName: ContainerName,
- imageName: String, limits: ActionLimits, pull: Boolean)(
- implicit transid: TransactionId): WhiskContainer = {
-
- val network = config.invokerContainerNetwork
- val cpuShare = ContainerPool.cpuShare(config)
- val policy = config.invokerContainerPolicy
- val env = getContainerEnvironment()
- // This will start up the container
- if (pull) runDockerPull {
- ContainerUtils.pullImage(dockerhost, imageName)
- }
- runDockerOp {
- // because of the docker lock, by the time the container gets around to be started
- // there could be a container to reuse (from a previous run of the same action, or
- // from a stem cell container); should revisit this logic
- new WhiskContainer(transid, this.dockerhost, key, containerName, imageName, network, cpuShare, policy, env, limits, isBlackbox = pull, logLevel = this.getVerbosity())
- }
- }
-
- // We send the payload here but eventually must also handle morphing a pre-allocated container into the right state.
- private def initWhiskContainer(action: WhiskAction, con: WhiskContainer)(implicit transid: TransactionId): FinalContainerResult = {
- con.boundParams = action.parameters.toJsObject
-
- // Then send it the init payload which is code for now
- val initArg = action.containerInitializer
- val initResult = con.init(initArg, action.limits.timeout.duration)
- Success(con, Some(initResult))
- }
-
- /**
- * Used through testing only. Creates a container running the command in `args`.
- */
- private def makeContainer(key: ActionContainerId, imageName: String, args: Array[String])(implicit transid: TransactionId): FinalContainerResult = {
- val con = runDockerOp {
- new Container(transid, this.dockerhost, key, None, imageName,
- config.invokerContainerNetwork, ContainerPool.cpuShare(config),
- config.invokerContainerPolicy, ActionLimits(), Map(), args,
- this.getVerbosity())
- }
- con.setVerbosity(getVerbosity())
- Success(con, None)
- }
-
- /**
- * Adds the container into the data structure in an Idle state.
- * The caller must have synchronized to maintain data structure atomicity.
- */
- private def introduceContainer(key: ActionContainerId, container: Container)(implicit transid: TransactionId): ContainerInfo = {
- val ci = new ContainerInfo(key, container)
- keyMap.getOrElseUpdate(key, ListBuffer()) += ci
- containerMap += container -> ci
- dumpState("introduceContainer")
- ci
- }
-
- private def dumpState(prefix: String)(implicit transid: TransactionId) = {
- debug(this, s"$prefix: keyMap = ${keyMapToString()}")
- }
-
- private def getDockerImageName(action: WhiskAction)(implicit transid: TransactionId): String = {
- val imageName = action.containerImageName(config.dockerRegistry, config.dockerImagePrefix, config.dockerImageTag)
- debug(this, s"Using image ${imageName}")
- imageName
- }
-
- private def getContainerEnvironment(): Map[String, String] = {
- Map(WhiskConfig.asEnvVar(WhiskConfig.edgeHostName) -> config.edgeHost)
- }
-
- private val defaultMaxIdle = 10
- private val defaultGCThreshold = 600.seconds
- private val slowDockerThreshold = 500.millis
- private val slowDockerPullThreshold = 5.seconds
-
- val gcFrequency = 1000.milliseconds // this should not be leaked but a test needs this until GC count is implemented
- private var _maxIdle = defaultMaxIdle
- private var _maxActive = 0
- private var _gcThreshold = defaultGCThreshold
- private var gcOn = true
- private val gcSync = new Object()
- resetMaxActive()
-
- private val timer = new Timer()
- private val gcTask = new TimerTask {
- def run() {
- performGC()(TransactionId.invoker)
- }
- }
- timer.scheduleAtFixedRate(gcTask, 0, gcFrequency.toMillis)
-
- /**
- * Removes all idle containers older than the threshold.
- */
- private def performGC()(implicit transid: TransactionId) = {
- val expiration = System.currentTimeMillis() - gcThreshold.toMillis
- removeAllIdle({ containerInfo => containerInfo.lastUsed <= expiration })
- dumpState("performGC")
- }
-
- /**
- * Collects all containers that are in the idle state and pass the predicate.
- * gcSync is used to prevent multiple GC's.
- */
- private def removeAllIdle(pred: ContainerInfo => Boolean)(implicit transid: TransactionId) = {
- gcSync.synchronized {
- val idleInfo = this.synchronized {
- val idle = containerMap filter { case (container, ci) => ci.isIdle() && pred(ci) }
- idle.keys foreach { con =>
- info(this, s"removeAllIdle removing container ${con.id}")
- }
- containerMap --= idle.keys
- keyMap foreach { case (key, ciList) => ciList --= idle.values }
- keyMap retain { case (key, ciList) => !ciList.isEmpty }
- idle.values
- }
- idleInfo.foreach(toBeRemoved.offer(_))
- }
- }
-
- // Remove containerInfo from data structures but does not perform actual container operation.
- // Caller must establish synchronization
- private def removeContainerInfo(conInfo: ContainerInfo)(implicit transid: TransactionId) = {
- containerMap -= conInfo.container
- keyMap foreach { case (key, ciList) => ciList -= conInfo }
- keyMap retain { case (key, ciList) => !ciList.isEmpty }
- }
-
- private def removeOldestIdle()(implicit transid: TransactionId) = {
- // Note that the container removal - if any - is done outside the synchronized block
- val oldestIdle = this.synchronized {
- val idle = (containerMap filter { case (container, ci) => ci.isIdle() })
- if (idle.isEmpty)
- List()
- else {
- val oldestConInfo = idle.minBy(_._2.lastUsed)._2
- info(this, s"removeOldestIdle removing container ${oldestConInfo.container.id}")
- removeContainerInfo(oldestConInfo)
- List(oldestConInfo)
- }
- }
- oldestIdle.foreach(toBeRemoved.offer(_))
- }
-
- // Getter/setter for this are above.
- private var _logDir = "/logs"
- private val actionContainerPrefix = "wsk"
-
- /**
- * Actually deletes the containers.
- */
- private def teardownContainer(container: Container)(implicit transid: TransactionId) = {
- val size = this.getLogSize(container, !standalone)
- val rawLogBytes = container.synchronized {
- this.getDockerLogContent(container.containerId, 0, size, !standalone)
- }
- val filename = s"${_logDir}/${container.name}.log"
- Files.write(Paths.get(filename), rawLogBytes)
- info(this, s"teardownContainers: wrote docker logs to $filename")
- runDockerOp { container.remove() }
- }
-
- /**
- * Removes all containers with the actionContainerPrefix to kill leftover action containers.
- * This is needed for startup and shutdown.
- * Concurrent access from clients must be prevented.
- */
- private def killStragglers(allContainers: Seq[ContainerState])(implicit transid: TransactionId) = {
- val candidates = allContainers.filter {
- _.name.name.startsWith(actionContainerPrefix)
- }
-
- info(this, s"Now removing ${candidates.length} leftover containers")
-
- candidates foreach { c =>
- unpauseContainer(c.name)
- rmContainer(c.name)
- }
-
- info(this, s"Leftover container removal completed")
- }
-
- /**
- * Gets the size of the mounted file associated with this whisk container.
- */
- def getLogSize(con: Container, mounted: Boolean)(implicit transid: TransactionId): Long = {
- getDockerLogSize(con.containerId, mounted)
- }
-
- nannyThread(listAll()(TransactionId.invokerWarmup)).start
- if (!standalone) {
- sys addShutdownHook {
- warn(this, "Shutdown hook activated. Starting container shutdown")
- shutdown()
- warn(this, "Shutdown hook completed.")
- }
- }
-
-}
-
-/*
- * These methods are parameterized on the configuration but defined here as an instance of ContainerPool is not
- * always available from other call sites.
- */
-object ContainerPool extends Logging {
- def requiredProperties = Map(
- selfDockerEndpoint -> "localhost",
- dockerImageTag -> "latest",
- invokerContainerNetwork -> "bridge",
- invokerNumCore -> "4",
- invokerCoreShare -> "2",
- invokerContainerPolicy -> "")
-
- /*
- * Extract parameters from whisk config. In the future, these may not be static but
- * dynamically updated. They still serve as a starting point for downstream parameters.
- */
- def numCore(config: WhiskConfig) = config.invokerNumCore.toInt
- def shareFactor(config: WhiskConfig) = config.invokerCoreShare.toInt
-
- /*
- * The total number of containers is simply the number of cores dilated by the cpu sharing.
- */
- def getDefaultMaxActive(config: WhiskConfig) = numCore(config) * shareFactor(config)
-
- /* The shareFactor indicates the number of containers that would share a single core, on average.
- * cpuShare is a docker option (-c) whereby a container's CPU access is limited.
- * A value of 1024 is the full share so a strict resource division with a shareFactor of 2 would yield 512.
- * On an idle/underloaded system, a container will still get to use underutilized CPU shares.
- */
- private val totalShare = 1024.0 // This is a pre-defined value coming from docker and not our hard-coded value.
- def cpuShare(config: WhiskConfig) = (totalShare / getDefaultMaxActive(config)).toInt
-
-}
diff --git a/core/dispatcher/src/main/scala/whisk/core/container/ContainerUtils.scala b/core/dispatcher/src/main/scala/whisk/core/container/ContainerUtils.scala
deleted file mode 100644
index 29becb4..0000000
--- a/core/dispatcher/src/main/scala/whisk/core/container/ContainerUtils.scala
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- * Copyright 2015-2016 IBM Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.container
-
-import whisk.common.Logging
-import whisk.common.SimpleExec
-import whisk.common.TransactionId
-import whisk.core.entity.ActionLimits
-import java.io.File
-import java.io.FileNotFoundException
-import scala.util.Try
-import scala.language.postfixOps
-import whisk.common.LoggingMarkers
-import akka.event.Logging.ErrorLevel
-import whisk.common.PrintStreamEmitter
-
-/**
- * Information from docker ps.
- */
-case class ContainerState(id: ContainerHash, image: String, name: ContainerName)
-
-trait ContainerUtils extends Logging {
-
- /** Defines the docker host, optional **/
- val dockerhost: String
-
- def makeEnvVars(env: Map[String, String]): Array[String] = {
- env.map {
- kv => s"-e ${kv._1}=${kv._2}"
- }.mkString(" ").split(" ").filter { x => x.nonEmpty }
- }
-
- /**
- * Creates a container instance and runs it.
- *
- * @param image the docker image to run
- * @return container id and container host
- */
- def bringup(name: Option[ContainerName], image: String, network: String, cpuShare: Int, env: Map[String, String], args: Array[String], limits: ActionLimits, policy: Option[String])(implicit transid: TransactionId): (ContainerHash, Option[ContainerAddr]) = {
- val id = makeContainer(name, image, network, cpuShare, env, args, limits, policy)
- val host = getContainerHostAndPort(id)
- (id, host)
- }
-
- /**
- * Pulls container images.
- */
- def pullImage(image: String)(implicit transid: TransactionId): DockerOutput = ContainerUtils.pullImage(dockerhost, image)
-
- /*
- * TODO: The file handle and process limits should be moved to some global limits config.
- */
- def makeContainer(name: Option[ContainerName], image: String, network: String, cpuShare: Int, env: Map[String, String], args: Seq[String], limits: ActionLimits, policy: Option[String])(implicit transid: TransactionId): ContainerHash = {
- val nameOption = name.map(n => Array("--name", n.name)).getOrElse(Array.empty[String])
- val cpuArg = Array("-c", cpuShare.toString)
- val memoryArg = Array("-m", s"${limits.memory()}m")
- val capabilityArg = Array("--cap-drop", "NET_RAW", "--cap-drop", "NET_ADMIN")
- val consulServiceIgnore = Array("-e", "SERVICE_IGNORE=true")
- val fileHandleLimit = Array("--ulimit", "nofile=64:64")
- val processLimit = Array("--ulimit", "nproc=512:512")
- val securityOpts = policy map { p => Array("--security-opt", s"apparmor:${p}") } getOrElse (Array.empty[String])
- val containerNetwork = Array("--net", network)
-
- val cmd = Seq("run") ++ makeEnvVars(env) ++ consulServiceIgnore ++ nameOption ++ cpuArg ++ memoryArg ++
- capabilityArg ++ fileHandleLimit ++ processLimit ++ securityOpts ++ containerNetwork ++ Seq("-d", image) ++ args
-
- runDockerCmd(cmd: _*).toOption.map { result =>
- ContainerHash.fromString(result)
- } getOrElse {
- throw new Exception("Container hash or name expected in `makeContainer`.")
- }
- }
-
- def killContainer(container: ContainerIdentifier)(implicit transid: TransactionId): DockerOutput = {
- runDockerCmd("kill", container.id)
- }
-
- def getContainerLogs(container: ContainerIdentifier)(implicit transid: TransactionId): DockerOutput = {
- runDockerCmd("logs", container.id)
- }
-
- def pauseContainer(container: ContainerIdentifier)(implicit transid: TransactionId): DockerOutput = {
- runDockerCmd(true, Seq("pause", container.id))
- }
-
- def unpauseContainer(container: ContainerIdentifier)(implicit transid: TransactionId): DockerOutput = {
- runDockerCmd(true, Seq("unpause", container.id))
- }
-
- /**
- * Forcefully removes a container, can be used on a running container but not a paused one.
- */
- def rmContainer(container: ContainerIdentifier)(implicit transid: TransactionId): DockerOutput = {
- runDockerCmd("rm", "-f", container.id)
- }
-
- /*
- * List containers (-a if all).
- */
- def listContainers(all: Boolean)(implicit transid: TransactionId): Seq[ContainerState] = {
- val tmp = Array("ps", "--no-trunc")
- val cmd = if (all) tmp :+ "-a" else tmp
- runDockerCmd(cmd: _*).toOption map { output =>
- val lines = output.split("\n").drop(1).toSeq // skip the header
- lines.map(parsePsOutput)
- } getOrElse Seq()
- }
-
- def getDockerLogSize(containerId: ContainerHash, mounted: Boolean)(implicit transid: TransactionId): Long = {
- try {
- getDockerLogFile(containerId, mounted).length
- } catch {
- case e: Exception =>
- error(this, s"getDockerLogSize failed on $containerId")
- 0
- }
- }
-
- /**
- * Reads the contents of the file at the given position.
- * It is assumed that the contents does exist and that region is not changing concurrently.
- */
- def getDockerLogContent(containerHash: ContainerHash, start: Long, end: Long, mounted: Boolean)(implicit transid: TransactionId): Array[Byte] = {
- var fis: java.io.FileInputStream = null
- try {
- val file = getDockerLogFile(containerHash, mounted)
- fis = new java.io.FileInputStream(file)
- val channel = fis.getChannel().position(start)
- var remain = (end - start).toInt
- val buffer = java.nio.ByteBuffer.allocate(remain)
- while (remain > 0) {
- val read = channel.read(buffer)
- if (read > 0)
- remain = read - read.toInt
- }
- buffer.array
- } catch {
- case e: Exception =>
- error(this, s"getDockerLogContent failed on ${containerHash.hash}: ${e.getClass}: ${e.getMessage}")
- Array()
- } finally {
- if (fis != null) fis.close()
- }
-
- }
-
- def getContainerHostAndPort(container: ContainerIdentifier)(implicit transid: TransactionId): Option[ContainerAddr] = {
- // FIXME it would be good if this could return ContainerAddr and fail loudly instead.
- runDockerCmd("inspect", "--format", "'{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}'", container.id).toOption.map { output =>
- ContainerAddr(output.substring(1, output.length - 1), 8080)
- }
- }
-
- private def runDockerCmd(args: String*)(implicit transid: TransactionId): DockerOutput = runDockerCmd(false, args)
-
- /**
- * Synchronously runs the given docker command returning stdout if successful.
- */
- private def runDockerCmd(skipLogError: Boolean, args: Seq[String])(implicit transid: TransactionId): DockerOutput =
- ContainerUtils.runDockerCmd(dockerhost, skipLogError, args)(transid)
-
- // If running outside a container, then logs files are in docker's own
- // /var/lib/docker/containers. If running inside a container, is mounted at /containers.
- // Root access is needed when running outside the container.
- private def dockerContainerDir(mounted: Boolean) = {
- if (mounted) "/containers" else "/var/lib/docker/containers"
- }
-
- /**
- * Gets the filename of the docker logs of other containers that is mapped back into the invoker.
- */
- private def getDockerLogFile(containerId: ContainerHash, mounted: Boolean) = {
- new java.io.File(s"""${dockerContainerDir(mounted)}/${containerId.hash}/${containerId.hash}-json.log""").getCanonicalFile()
- }
-
- private def parsePsOutput(line: String): ContainerState = {
- val tokens = line.split("\\s+")
- val hash = ContainerHash.fromString(tokens(0))
- val name = ContainerName.fromString(tokens.last)
- ContainerState(hash, tokens(1), name)
- }
-}
-
-object ContainerUtils extends Logging {
-
- private implicit val emitter: PrintStreamEmitter = this
-
- /**
- * Synchronously runs the given docker command returning stdout if successful.
- */
- def runDockerCmd(dockerhost: String, skipLogError: Boolean, args: Seq[String])(implicit transid: TransactionId): DockerOutput = {
- val start = transid.started(this, LoggingMarkers.INVOKER_DOCKER_CMD(args(0)))
-
- try {
- val fullCmd = getDockerCmd(dockerhost) ++ args
-
- val (stdout, stderr, exitCode) = SimpleExec.syncRunCmd(fullCmd)
-
- if (exitCode == 0) {
- transid.finished(this, start)
- DockerOutput(stdout.trim)
- } else {
- if (!skipLogError) {
- transid.failed(this, start, s"stdout:\n$stdout\nstderr:\n$stderr", ErrorLevel)
- } else {
- transid.failed(this, start)
- }
- DockerOutput.unavailable
- }
- } catch {
- case t: Throwable =>
- transid.failed(this, start, "error: " + t.getMessage, ErrorLevel)
- DockerOutput.unavailable
- }
- }
-
- private def getDockerCmd(dockerhost: String): Seq[String] = {
- def file(path: String) = Try { new File(path) } filter { _.exists } toOption
-
- val dockerLoc = file("/usr/bin/docker") orElse file("/usr/local/bin/docker")
-
- val dockerBin = dockerLoc.map(_.toString).getOrElse {
- throw new FileNotFoundException("Couldn't locate docker binary.")
- }
-
- if (dockerhost == "localhost") {
- Seq(dockerBin)
- } else {
- Seq(dockerBin, "--host", s"tcp://$dockerhost")
- }
- }
-
- /**
- * Pulls container images.
- */
- def pullImage(dockerhost: String, image: String)(implicit transid: TransactionId): DockerOutput = {
- val cmd = Array("pull", image)
- runDockerCmd(dockerhost, false, cmd)
- }
-
-}
diff --git a/core/dispatcher/src/main/scala/whisk/core/container/WhiskContainer.scala b/core/dispatcher/src/main/scala/whisk/core/container/WhiskContainer.scala
deleted file mode 100644
index fab78a0..0000000
--- a/core/dispatcher/src/main/scala/whisk/core/container/WhiskContainer.scala
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Copyright 2015-2016 IBM Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.container
-
-import java.time.Clock
-import java.time.Instant
-import java.util.concurrent.atomic.AtomicInteger
-
-import scala.concurrent.duration.DurationInt
-
-import scala.concurrent.Await
-import scala.concurrent.Future
-import scala.concurrent.duration.FiniteDuration
-import scala.concurrent.duration.DurationInt
-
-import akka.actor.ActorSystem
-import akka.event.Logging.LogLevel
-import akka.http.scaladsl.Http
-import akka.http.scaladsl.model._
-import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
-import akka.http.scaladsl.marshalling._
-import akka.http.scaladsl.unmarshalling._
-import akka.stream.ActorMaterializer
-
-import spray.json.JsObject
-import spray.json.JsString
-import whisk.common.TransactionId
-import whisk.core.entity.ActionLimits
-import whisk.common.LoggingMarkers
-import whisk.common.PrintStreamEmitter
-import whisk.common.NewHttpUtils
-
-/**
- * Reifies a whisk container - one that respects the whisk container API.
- */
-class WhiskContainer(
- originalId: TransactionId,
- dockerhost: String,
- key: ActionContainerId,
- containerName: ContainerName,
- image: String,
- network: String,
- cpuShare: Int,
- policy: Option[String],
- env: Map[String, String],
- limits: ActionLimits,
- args: Array[String] = Array(),
- val isBlackbox: Boolean,
- logLevel: LogLevel)
- extends Container(originalId, dockerhost, key, Some(containerName), image, network, cpuShare, policy, limits, env, args, logLevel) {
-
- var boundParams = JsObject() // Mutable to support pre-alloc containers
- var lastLogSize = 0L
- private implicit val emitter: PrintStreamEmitter = this
-
- /**
- * Merges previously bound parameters with arguments form payload.
- */
- def mergeParams(payload: JsObject, recurse: Boolean = true)(implicit transid: TransactionId): JsObject = {
- //debug(this, s"merging ${boundParams.compactPrint} with ${payload.compactPrint}")
- JsObject(boundParams.fields ++ payload.fields)
- }
-
- /**
- * Sends initialization payload to container.
- */
- def init(args: JsObject, timeout: FiniteDuration)(implicit system: ActorSystem, transid: TransactionId): RunResult = {
- info(this, s"sending initialization to ${this.details}")
- // when invoking /init, don't wait longer than the timeout configured for this action
- val result = sendPayload("/init", JsObject("value" -> args), timeout) // this will retry
- info(this, s"initialization result: ${result}")
- result
- }
-
- /**
- * Sends a run command to action container to run once.
- *
- * @param state the value of the status to compare the actual state against
- * @return triple of start time, end time, response for user action.
- */
- def run(args: JsObject, meta: JsObject, authKey: String, timeout: FiniteDuration, actionName: String, activationId: String)(implicit system: ActorSystem, transid: TransactionId): RunResult = {
- val startMarker = transid.started("Invoker", LoggingMarkers.INVOKER_ACTIVATION_RUN, s"sending arguments to $actionName $details")
- val result = sendPayload("/run", JsObject(meta.fields + ("value" -> args) + ("authKey" -> JsString(authKey))), timeout)
- // Use start and end time of the activation
- val RunResult(Interval(startActivation, endActivation), _) = result
- transid.finished("Invoker", startMarker.copy(startActivation), s"finished running activation id: $activationId", endTime = endActivation)
- result
- }
-
- /**
- * An alternative entry point for direct testing of action container.
- */
- def run(payload: String, activationId: String)(implicit system: ActorSystem): RunResult = {
- val params = JsObject("payload" -> JsString(payload))
- val meta = JsObject("activationId" -> JsString(activationId))
- run(params, meta, "no_auth_key", 30000.milliseconds, "no_action", "no_activation_id")(system, TransactionId.testing)
- }
-
- /**
- * Tear down the container and retrieve the logs.
- */
- def teardown()(implicit transid: TransactionId): String = {
- getContainerLogs(containerName).toOption.getOrElse("none")
- }
-
- /**
- * Posts a message to the container.
- *
- * @param msg the message to post
- * @return response from container if any as array of byte
- */
- private def sendPayload(endpoint: String, msg: JsObject, timeout: FiniteDuration)(implicit system: ActorSystem): RunResult = {
- import system.dispatcher
-
- val start = ContainerCounter.now()
-
- val f = sendPayloadAsync(endpoint, msg, timeout)
-
- f.onFailure {
- case t: Throwable =>
- warn(this, s"Exception while posting to action container ${t.getMessage}")
- }
-
- // Should never timeout because the future has a built-in timeout.
- // Keeping a finite duration for safety.
- Await.ready(f, timeout + 1.minute)
-
- val end = ContainerCounter.now()
-
- val r = f.value.get.toOption.flatten
- RunResult(Interval(start, end), r)
- }
-
- /**
- * Asynchronously posts a message to the container.
- *
- * @param msg the message to post
- * @return response from the container if any
- */
- private def sendPayloadAsync(endpoint: String, msg: JsObject, timeout: FiniteDuration)(implicit system: ActorSystem): Future[Option[(Int, String)]] = {
- implicit val ec = system.dispatcher
- implicit val materializer = ActorMaterializer()
-
- containerHostAndPort map { hp =>
-
- val flow = Http().outgoingConnection(hp.host, hp.port)
-
- val uri = Uri(
- scheme = "http",
- authority = Uri.Authority(host = Uri.Host(hp.host), port = hp.port),
- path = Uri.Path(endpoint))
-
- for (
- entity <- Marshal(msg).to[MessageEntity];
- request = HttpRequest(method = HttpMethods.POST, uri = uri, entity = entity);
- response <- NewHttpUtils.singleRequest(request, timeout, retryOnTCPErrors = true, retryInterval = 100.milliseconds);
- responseBody <- Unmarshal(response.entity).to[String]
- ) yield {
- Some((response.status.intValue, responseBody))
- }
- } getOrElse {
- Future.successful(None)
- }
- }
-}
-
-/**
- * Singleton to thread-safely count containers.
- */
-protected[container] object ContainerCounter {
- private val cnt = new AtomicInteger(0)
- private def next(): Int = {
- cnt.incrementAndGet()
- }
- private def cut(): Int = {
- cnt.get()
- }
-
- def now() = Instant.now(Clock.systemUTC())
-
- def containerName(containerPrefix: String, containerSuffix: String): ContainerName = {
- val name = s"wsk${containerPrefix}_${ContainerCounter.next()}_${containerSuffix}_${now()}".replaceAll("[^a-zA-Z0-9_]", "")
- ContainerName.fromString(name)
- }
-}
diff --git a/core/dispatcher/src/main/scala/whisk/core/dispatcher/ActivationFeed.scala b/core/dispatcher/src/main/scala/whisk/core/dispatcher/ActivationFeed.scala
deleted file mode 100644
index e3333be..0000000
--- a/core/dispatcher/src/main/scala/whisk/core/dispatcher/ActivationFeed.scala
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Copyright 2015-2016 IBM Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.dispatcher
-
-import scala.concurrent.duration.FiniteDuration
-import scala.util.Try
-
-import org.apache.kafka.clients.consumer.CommitFailedException
-
-import akka.actor.Actor
-import akka.actor.actorRef2Scala
-import whisk.common.Logging
-import whisk.common.TransactionId
-import whisk.core.connector.MessageConsumer
-
-object ActivationFeed {
- sealed class ActivationNotification
-
- /** Pulls new messages from the message bus. */
- case class FillQueueWithMessages()
-
- /** Indicates resources are available because transaction completed, may cause pipeline fill. */
- case class ContainerReleased(tid: TransactionId) extends ActivationNotification
-
- /** Indicate resources are available because transaction failed, may cause pipeline fill. */
- case class FailedActivation(tid: TransactionId) extends ActivationNotification
-}
-
-/**
- * This actor polls the message bus for new messages and dispatches them to the given
- * handler. The actor tracks the number of messages dispatched and will not dispatch new
- * messages until some number of them are acknowledged.
- *
- * This is used by the invoker to pull messages from the message bus and apply back pressure
- * when the invoker does not have resources to complete processing messages (i.e., no containers
- * are available to run new actions).
- *
- * When the invoker releases resources (by reclaiming containers) it will send a message
- * to this actor which will then attempt to fill the pipeline with new messages.
- *
- * The actor tries to fill the pipeline with additional messages while the number
- * of outstanding requests is below the pipeline fill threshold.
- */
-@throws[IllegalArgumentException]
-protected class ActivationFeed(
- logging: Logging,
- consumer: MessageConsumer,
- maxPipelineDepth: Int,
- longpollDuration: FiniteDuration,
- handler: (String, Array[Byte]) => Any)
- extends Actor {
- import ActivationFeed.ActivationNotification
- import ActivationFeed.FillQueueWithMessages
-
- require(consumer.maxPeek <= maxPipelineDepth, "consumer may not yield more messages per peek than permitted by max depth")
-
- private val pipelineFillThreshold = maxPipelineDepth - consumer.maxPeek
- private var pipelineOccupancy = 0
- private implicit val tid = TransactionId.dispatcher
-
- override def receive = {
- case FillQueueWithMessages =>
- if (pipelineOccupancy <= pipelineFillThreshold) {
- Try {
- // Grab next batch of messages and commit offsets immediately
- // essentially marking the activation as having satisfied "at most once"
- // semantics (this is the point at which the activation is considered started).
- // If the commit fails, then messages peeked are peeked again on the next poll.
- // While the commit is synchronous and will block until it completes, at steady
- // state with enough buffering (i.e., maxPipelineDepth > maxPeek), the latency
- // of the commit should be masked.
- val records = consumer.peek(longpollDuration)
- consumer.commit()
- (records, records.size)
- } map {
- case (records, count) =>
- records foreach {
- case (topic, partition, offset, bytes) =>
- logging.info(this, s"processing $topic[$partition][$offset ($count)]")
- pipelineOccupancy += 1
- handler(topic, bytes)
- }
- } recover {
- case e: CommitFailedException => logging.error(this, s"failed to commit consumer offset: ${e.getMessage}")
- case e: Throwable => logging.error(this, s"exception while pulling new records: ${e.getMessage}")
- }
- fill()
- } else logging.debug(this, "dropping fill request until feed is drained")
-
- case _: ActivationNotification =>
- pipelineOccupancy -= 1
- fill()
- }
-
- private def fill() = {
- if (pipelineOccupancy <= pipelineFillThreshold) {
- logging.debug(this, s"filling activation pipeline: $pipelineOccupancy <= $pipelineFillThreshold")
- self ! FillQueueWithMessages
- } else {
- logging.info(this, s"waiting for activation pipeline to drain: $pipelineOccupancy > $pipelineFillThreshold")
- }
- }
-}
diff --git a/core/dispatcher/src/main/scala/whisk/core/dispatcher/DispatchRule.scala b/core/dispatcher/src/main/scala/whisk/core/dispatcher/DispatchRule.scala
deleted file mode 100644
index 224aacf..0000000
--- a/core/dispatcher/src/main/scala/whisk/core/dispatcher/DispatchRule.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Copyright 2015-2016 IBM Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.dispatcher
-
-import scala.concurrent.Future
-import scala.util.matching.Regex.Match
-import whisk.common.Logging
-import whisk.common.TransactionId
-import whisk.core.dispatcher.Matcher.makeRegexForPaths
-import whisk.core.connector.{ ActivationMessage => Message }
-
-/**
- * Abstract base class for a handler for a kafka message.
- *
- * This extends Matcher, which provides dispatch logic functionality based on kafka topics.
- */
-abstract class DispatchRule(
- ruleName: String,
- topicPrefix: String,
- topicPatterns: String)
- extends Matcher(ruleName, makeRegexForPaths(topicPatterns, topicPrefix))
- with Logging {
-
- /**
- * Invokes a handler for a Kafka messages. This method should only be called if matches is not empty.
- * The method is run inside a future. If the method fails with an exception, the exception completes
- * the wrapping future within which the method is run.
- *
- * @param topic the topic the message came in on
- * @param msg the Message object to process
- * @param matches a sequences of pattern matches that triggered this action
- * @param transid the transaction id for the kafka message
- * @return Future that execute the handler
- */
- def doit(topic: String, msg: Message, matches: Seq[Match])(implicit transid: TransactionId): Future[Any]
-}
diff --git a/core/dispatcher/src/main/scala/whisk/core/dispatcher/Dispatcher.scala b/core/dispatcher/src/main/scala/whisk/core/dispatcher/Dispatcher.scala
deleted file mode 100644
index b5b7776..0000000
--- a/core/dispatcher/src/main/scala/whisk/core/dispatcher/Dispatcher.scala
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Copyright 2015-2016 IBM Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.dispatcher
-
-import scala.collection.concurrent.TrieMap
-import scala.concurrent.Future
-import scala.concurrent.duration.FiniteDuration
-import scala.util.Failure
-import scala.util.Success
-import scala.util.matching.Regex.Match
-
-import akka.actor.ActorSystem
-import akka.actor.Props
-import akka.actor.actorRef2Scala
-import whisk.common.Counter
-import whisk.common.Logging
-import whisk.common.TransactionId
-import whisk.core.connector.{ ActivationMessage => Message }
-import whisk.core.connector.MessageConsumer
-import whisk.core.invoker.InvokerService
-import akka.event.Logging.LogLevel
-
-object Dispatcher extends Logging {
- def main(args: Array[String]): Unit = {
- val name = if (args.nonEmpty) args(0).trim.toLowerCase() else ""
- name match {
- case "invoker" => InvokerService.main(args)
- case _ => error(Dispatcher, s"unrecognized app $name")
- }
- }
-}
-
-/**
- * Creates a dispatcher that pulls messages from the message pub/sub connector.
- * This is currently used by invoker only. It may be removed in the future and
- * its functionality merged directly with the invoker. The current model allows
- * for different message types to be received by more than one consumer in the
- * same process (via handler registration).
- *
- * @param verbosity level for logging
- * @param consumer the consumer providing messages
- * @param pollDuration the long poll duration (max duration to wait for new messages)
- * @param maxPipelineDepth the maximum number of messages allowed in the queued (even >=2)
- * @param actorSystem an actor system to create actor
- */
-@throws[IllegalArgumentException]
-class Dispatcher(
- verbosity: LogLevel,
- consumer: MessageConsumer,
- pollDuration: FiniteDuration,
- maxPipelineDepth: Int,
- actorSystem: ActorSystem)
- extends Registrar
- with Logging {
-
- setVerbosity(verbosity)
-
- val activationFeed = actorSystem.actorOf(Props(new ActivationFeed(this: Logging, consumer, maxPipelineDepth, pollDuration, process)))
-
- def start() = activationFeed ! ActivationFeed.FillQueueWithMessages
- def stop() = consumer.close()
-
- /**
- * Consumes messages from the bus using a streaming consumer
- * interface. Each message is a JSON object with at least these properties:
- * { path: the topic name,
- * payload: the message body }
- *
- * Expected topics are "/whisk/invoke[0..n-1]" (handled by Invoker).
- * Expected paths "actions/invoke" (handled by Invoker).
- *
- * The paths should generally mirror the REST API.
- *
- * For every message that is received, this method extracts the path property
- * from the message and checks if there are registered handlers for the message.
- * A handler is registered via addHandler and unregistered via removeHandler.
- * All matches are checked in parallel, and messages are dispatched to all matching
- * handlers. The handling of a message is wrapped in a Future. A handler is skipped
- * if it is not active.
- */
- def process(topic: String, bytes: Array[Byte]) = {
- val raw = new String(bytes, "utf-8")
- Message(raw) match {
- case Success(m) =>
- implicit val tid = m.transid
- if (m.path.nonEmpty) inform(handlers) foreach {
- case (name, handler) =>
- val matches = handler.matches(m.path)
- handleMessage(handler, topic, m, matches)
- }
- case Failure(t) => info(this, errorMsg(raw, t))
- }
- }
-
- private def handleMessage(rule: DispatchRule, topic: String, msg: Message, matches: Seq[Match]) = {
- implicit val tid = msg.transid
- implicit val executionContext = actorSystem.dispatcher
-
- if (matches.nonEmpty) Future {
- val count = counter.next()
- debug(this, s"activeCount = $count while handling ${rule.name}")
- rule.doit(topic, msg, matches) // returns a future which is flat-mapped to hang onComplete
- } flatMap (identity) onComplete {
- case Success(a) => debug(this, s"activeCount = ${counter.prev()} after handling $rule")
- case Failure(t) => error(this, s"activeCount = ${counter.prev()} ${errorMsg(rule, t)}")
- }
- }
-
- private def inform(matchers: TrieMap[String, DispatchRule])(implicit transid: TransactionId) = {
- val names = matchers map { _._2.name } reduce (_ + "," + _)
- debug(this, s"matching message to ${matchers.size} handlers: $names")
- matchers
- }
-
- private def errorMsg(handler: DispatchRule, e: Throwable): String =
- s"failed applying handler '${handler.name}': ${errorMsg(e)}"
-
- private def errorMsg(msg: String, e: Throwable) =
- s"failed processing message: $msg $e${e.getStackTrace.mkString("", " ", "")}"
-
- private def errorMsg(e: Throwable): String = {
- if (e.isInstanceOf[java.util.concurrent.ExecutionException]) {
- s"$e${e.getCause.getStackTrace.mkString("", " ", "")}"
- } else {
- s"$e${e.getStackTrace.mkString("", " ", "")}"
- }
- }
-
- private val counter = new Counter()
-}
-
-trait Registrar {
- /**
- * Adds handler for a message. The handler name must be unique, else
- * the new handler replaces a previously added one unless this behavior
- * is overridden by setting replace to false.
- *
- * @param handler is the message handler to add override
- * @param replace indicates whether a new handler should replace an older handler by the same name
- * @return an option dispatch rule, the previous value of the rule if any
- */
- def addHandler(handler: DispatchRule, replace: Boolean): Option[DispatchRule] = {
- if (handler != null && handler.isValid) {
- if (replace) handlers.put(handler.name, handler)
- else handlers.putIfAbsent(handler.name, handler)
- } else None
- }
-
- /**
- * Removes handlers by name if it exists.
- *
- * @param name is the name of the handler to remove
- * @return the handler just removed if any
- */
- def removeHandler(name: String): Option[DispatchRule] = {
- if (name != null && name.trim.nonEmpty)
- handlers.remove(name)
- else None
- }
-
- /**
- * Removes handler if it exists.
- *
- * @param handler is the message handler to remove
- * @return the handler just removed if any
- */
- def removeHandler(handler: DispatchRule): Option[DispatchRule] = {
- if (handler != null && handler.isValid) {
- handlers.remove(handler.name)
- } else None
- }
-
- protected val handlers = new TrieMap[String, DispatchRule]
-}
diff --git a/core/dispatcher/src/main/scala/whisk/core/dispatcher/Matcher.scala b/core/dispatcher/src/main/scala/whisk/core/dispatcher/Matcher.scala
deleted file mode 100644
index 227e842..0000000
--- a/core/dispatcher/src/main/scala/whisk/core/dispatcher/Matcher.scala
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Copyright 2015-2016 IBM Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.dispatcher
-
-import scala.util.matching.Regex
-import scala.util.matching.Regex.Match
-
-/**
- * Creates a match rule to match a kafka topic against one or more regular expressions.
- *
- * @param n the name for this rule -- no semantic effect,just used for debugging and printing
- * @param p one or more regular expression to match a topic against
- */
-class Matcher(n: String, p: Regex*) {
-
- def this(n: String, p: List[Regex]) =
- this(n, p: _*)
-
- def this(n: String, p: String) =
- this(n, Matcher.makeRegexForPaths(p): _*)
-
- /**
- * Checks if a topic matches any of the registered path patterns.
- *
- * @param topic the string to check patterns against.
- * @return sequence of Regex.Match which can be queried for grouped matches
- * if there are no matches, the sequence is empty
- */
- def matches(topic: String): Seq[Match] = {
- val t = if (topic != null) topic.trim else ""
- if (t.nonEmpty) {
- paths.flatMap { p =>
- for (m <- p findAllMatchIn t) yield m
- }
- } else Seq[Match]()
- }
-
- val name = if (n != null) n.trim else ""
- def isValid = name.nonEmpty && paths.length > 0
- override def hashCode = name.hashCode
- override def toString = s"$name: matches on " + paths.mkString(",")
- private val paths = if (p != null) p.filter { x => x != null } else List[Regex]()
-}
-
-object Matcher {
-
- /**
- * Makes regex array for given comma separated paths string, each regex is
- * of the form "(?i)^[/prefix]/(name)$".
- *
- * @param names required string of comma separated path names
- * @param prefix optional prefix to the path, may be null or empty
- * @param ignoreCase optional, true to ignore case otherwise case-sensitive
- */
- def makeRegexForPaths(names: String, prefix: String = null, ignoreCase: Boolean = true): List[Regex] =
- if (names != null) {
- names.split(',')
- .filter { x => x != null && x.trim.nonEmpty }
- .toList
- .map(s => makeRegexForPath(s, prefix, ignoreCase))
- } else List[Regex]()
-
- /**
- * Makes regex string of the form "(?i)^[/prefix]/(name)$".
- *
- * @param name required name
- * @param prefix optional prefix to the name, may be null or empty
- * @param ignoreCase optional, true to ignore case otherwise case-sensitive
- */
- def makeRegexForPath(name: String, prefix: String = null, ignoreCase: Boolean = true): Regex = {
- val trimPrefix = if (prefix != null) prefix.trim.stripMargin('/') else ""
- val trimName = if (name != null) name.trim.stripMargin('/') else ""
- val caseString = if (ignoreCase) "(?i)" else ""
- val sep = if (trimPrefix.isEmpty) "" else "/"
- if (trimName.nonEmpty) {
- s"$caseString^$sep$trimPrefix/($trimName)$$".r
- } else null
- }
-}
diff --git a/core/dispatcher/src/main/scala/whisk/core/invoker/Invoker.scala b/core/dispatcher/src/main/scala/whisk/core/invoker/Invoker.scala
deleted file mode 100644
index 4791e3e..0000000
--- a/core/dispatcher/src/main/scala/whisk/core/invoker/Invoker.scala
+++ /dev/null
@@ -1,585 +0,0 @@
-/*
- * Copyright 2015-2016 IBM Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.invoker
-
-import java.time.Clock
-import java.time.Instant
-
-import scala.Vector
-import scala.collection.concurrent.TrieMap
-import scala.concurrent.ExecutionContext
-import scala.concurrent.Future
-import scala.concurrent.duration.Duration
-import scala.concurrent.duration.DurationInt
-import scala.language.postfixOps
-import scala.util.Failure
-import scala.util.Success
-import scala.util.Try
-import scala.util.matching.Regex.Match
-
-import akka.actor.ActorRef
-import akka.actor.ActorSystem
-import akka.actor.actorRef2Scala
-import akka.event.Logging.InfoLevel
-import akka.event.Logging.LogLevel
-import akka.japi.Creator
-import spray.json._
-import spray.json.DefaultJsonProtocol.IntJsonFormat
-import spray.json.DefaultJsonProtocol.StringJsonFormat
-import whisk.common.ConsulClient
-import whisk.common.ConsulKV.InvokerKeys
-import whisk.common.ConsulKVReporter
-import whisk.common.Counter
-import whisk.common.LoggingMarkers
-import whisk.common.PrintStreamEmitter
-import whisk.common.SimpleExec
-import whisk.common.TransactionId
-import whisk.connector.kafka.KafkaConsumerConnector
-import whisk.connector.kafka.KafkaProducerConnector
-import whisk.core.WhiskConfig
-import whisk.core.WhiskConfig.consulServer
-import whisk.core.WhiskConfig.dockerImagePrefix
-import whisk.core.WhiskConfig.dockerRegistry
-import whisk.core.WhiskConfig.edgeHost
-import whisk.core.WhiskConfig.kafkaHost
-import whisk.core.WhiskConfig.logsDir
-import whisk.core.WhiskConfig.servicePort
-import whisk.core.WhiskConfig.whiskVersion
-import whisk.core.connector.{ ActivationMessage => Message }
-import whisk.core.connector.CompletionMessage
-import whisk.core.container.ContainerPool
-import whisk.core.container.Interval
-import whisk.core.container.RunResult
-import whisk.core.container.WhiskContainer
-import whisk.core.dispatcher.ActivationFeed.ActivationNotification
-import whisk.core.dispatcher.ActivationFeed.ContainerReleased
-import whisk.core.dispatcher.ActivationFeed.FailedActivation
-import whisk.core.dispatcher.DispatchRule
-import whisk.core.dispatcher.Dispatcher
-import whisk.core.entity.ActivationLogs
-import whisk.core.entity.ActivationResponse
-import whisk.core.entity.BlackBoxExec
-import whisk.core.entity.DocId
-import whisk.core.entity.DocInfo
-import whisk.core.entity.DocRevision
-import whisk.core.entity.EntityName
-import whisk.core.entity.LogLimit
-import whisk.core.entity.Namespace
-import whisk.core.entity.SemVer
-import whisk.core.entity.Subject
-import whisk.core.entity.WhiskAction
-import whisk.core.entity.WhiskActivation
-import whisk.core.entity.WhiskActivationStore
-import whisk.core.entity.WhiskAuth
-import whisk.core.entity.WhiskAuthStore
-import whisk.core.entity.WhiskEntity
-import whisk.core.entity.WhiskEntityStore
-import whisk.core.entity.size.SizeLong
-import whisk.core.entity.size.SizeString
-import whisk.http.BasicHttpService
-import whisk.utils.ExecutionContextFactory
-import whisk.common.Logging
-
-/**
- * A kafka message handler that invokes actions as directed by message on topic "/actions/invoke".
- * The message path must contain a fully qualified action name and an optional revision id.
- *
- * @param config the whisk configuration
- * @param instance the invoker instance number
- * @param runningInContainer if false, invoker is run outside a container -- for testing
- */
-class Invoker(
- config: WhiskConfig,
- instance: Int,
- activationFeed: ActorRef,
- verbosity: LogLevel = InfoLevel,
- runningInContainer: Boolean = true)(implicit actorSystem: ActorSystem)
- extends DispatchRule("invoker", "/actions/invoke", s"""(.+)/(.+)/(.+),(.+)/(.+)""") {
-
- private implicit val executionContext: ExecutionContext = actorSystem.dispatcher
- private implicit val emitter: PrintStreamEmitter = this
-
- /** This generates completion messages back to the controller */
- val producer = new KafkaProducerConnector(config.kafkaHost, executionContext)
-
- override def setVerbosity(level: LogLevel) = {
- super.setVerbosity(level)
- pool.setVerbosity(level)
- entityStore.setVerbosity(level)
- authStore.setVerbosity(level)
- activationStore.setVerbosity(level)
- producer.setVerbosity(level)
- }
-
- /*
- * We track the state of the transaction by wrapping the Message object.
- * Note that var fields cannot be added to Message as it leads to serialization issues and
- * doesn't make sense to mix local mutable state with the value being passed around.
- *
- * See completeTransaction for why complete is needed.
- */
- case class Transaction(msg: Message) {
- var result: Option[Future[DocInfo]] = None
- var initInterval: Option[Interval] = None
- var runInterval: Option[Interval] = None
- }
-
- /**
- * This is the handler for the kafka message
- *
- * @param msg is the kafka message payload as Json
- * @param matches contains the regex matches
- */
- override def doit(topic: String, msg: Message, matches: Seq[Match])(implicit transid: TransactionId): Future[DocInfo] = {
- val start = transid.started(this, LoggingMarkers.INVOKER_ACTIVATION)
- Future {
- // conformance checks can terminate the future if a variance is detected
- require(matches != null && matches.size >= 1, "matches undefined")
- require(matches(0).groupCount >= 3, "wrong number of matches")
- require(matches(0).group(2).nonEmpty, "action namespace undefined") // fully qualified name (namespace:name)
- require(matches(0).group(3).nonEmpty, "action name undefined") // fully qualified name (namespace:name)
- require(msg != null, "message undefined")
-
- val regex = matches(0)
- val namespace = Namespace(regex.group(2))
- val name = EntityName(regex.group(3))
- val version = if (regex.groupCount == 4) DocRevision(regex.group(4)) else DocRevision()
- val action = DocId(WhiskEntity.qualifiedName(namespace, name)).asDocInfo(version)
- action
- } flatMap {
- fetchFromStoreAndInvoke(_, msg) map { docInfo =>
- transid.finished(this, start)
- docInfo
- }
- }
- }
-
- /**
- * Common point for injection from Kafka or InvokerServer.
- */
- def fetchFromStoreAndInvoke(action: DocInfo, msg: Message)(
- implicit transid: TransactionId): Future[DocInfo] = {
- val tran = Transaction(msg)
- val subject = msg.subject
- val payload = msg.content getOrElse JsObject()
- info(this, s"${action.id} $subject ${msg.activationId}")
-
- // caching is enabled since actions have revision id and an updated
- // action will not hit in the cache due to change in the revision id;
- // if the doc revision is missing, then bypass cache
- val actionFuture = WhiskAction.get(entityStore, action, action.rev != DocRevision())
- actionFuture onFailure {
- case t => error(this, s"failed to fetch action ${action.id}: ${t.getMessage}")
- }
-
- // keys are immutable, cache them
- val authFuture = WhiskAuth.get(authStore, subject, true)
- authFuture onFailure {
- case t => error(this, s"failed to fetch auth key for $subject: ${t.getMessage}")
- }
-
- // when records are fetched, invoke action
- val activationDocFuture = actionFuture flatMap { theAction =>
- // assume this future is done here
- authFuture flatMap { theAuth =>
- // assume this future is done here
- invokeAction(theAction, theAuth, payload, tran)
- }
- }
-
- activationDocFuture onComplete {
- case Success(activationDoc) =>
- info(this, s"recorded activation '$activationDoc'")
- activationDoc
- case Failure(t) =>
- info(this, s"failed to invoke action ${action.id} due to ${t.getMessage}")
- completeTransactionWithError(action, tran, s"failed to invoke action ${action.id}: ${t.getMessage}")
- }
-
- activationDocFuture
- }
-
- /*
- * Creates a whisk activation out of the errorMsg and finish the transaction.
- * Failing with an error can involve multiple futures but the effecting call is completeTransaction which is guarded.
- */
- protected def completeTransactionWithError(actionDocInfo: DocInfo, tran: Transaction, errorMsg: String)(
- implicit transid: TransactionId): Unit = {
- error(this, errorMsg)
- val msg = tran.msg
- val name = EntityName(actionDocInfo.id().split(Namespace.PATHSEP)(1))
- val version = SemVer() // TODO: this is wrong, when the semver is passed from controller, fix this
- val response = ActivationResponse.whiskError(errorMsg)
- val interval = computeActivationInterval(tran)
- val activation = makeWhiskActivation(msg, name, version, response, interval)
- completeTransaction(tran, activation, FailedActivation(transid))
- }
-
- /*
- * Action that must be taken when an activation completes (with or without error).
- *
- * Invariant: Only one call to here succeeds. Even though the sync block wrap WhiskActivation.put,
- * it is only blocking this transaction which is finishing anyway.
- */
- protected def completeTransaction(tran: Transaction, activation: WhiskActivation, releaseResource: ActivationNotification)(
- implicit transid: TransactionId): Future[DocInfo] = {
- tran.synchronized {
- tran.result match {
- case Some(res) => res
- case None => {
- activationCounter.next() // this is the global invoker counter
- incrementUserActivationCounter(tran.msg.subject)
- // Send a message to the activation feed indicating there is a free resource to handle another activation.
- // Since all transaction completions flow through this method and the invariant is that the transaction is
- // completed only once, there is only one completion message sent to the feed as a result.
- activationFeed ! releaseResource
- // Since there is no active action taken for completion from the invoker, writing activation record is it.
- info(this, "recording the activation result to the data store")
- val result = WhiskActivation.put(activationStore, activation)
- tran.result = Some(result)
- result
- }
- }
- }
- }
-
- protected def invokeAction(action: WhiskAction, auth: WhiskAuth, payload: JsObject, tran: Transaction)(
- implicit transid: TransactionId): Future[DocInfo] = {
- val msg = tran.msg
-
- pool.getAction(action, auth) match {
- case Some((con, initResultOpt)) => Future {
- val params = con.mergeParams(payload)
- val timeout = action.limits.timeout.duration
- def run() = con.run(params, msg.meta, auth.compact, timeout, action.fullyQualifiedName, msg.activationId.toString)
-
- initResultOpt match {
- // cached container
- case None => (false, run())
-
- // new container
- case Some(RunResult(interval, response)) =>
- tran.initInterval = Some(interval)
- response match {
- // successful init
- case Some((200, _)) => (false, run())
- // unsuccessful initialization
- case _ => (true, initResultOpt.get)
- }
- }
- } flatMap {
- case (failedInit, RunResult(interval, response)) =>
- // it is possible for response to be None if the container timed out
- if (!failedInit) tran.runInterval = Some(interval)
-
- val activationInterval = computeActivationInterval(tran)
- val activationResponse = getActivationResponse(activationInterval, action.limits.timeout.duration, response, failedInit)
- val activationResult = makeWhiskActivation(msg, action.name, action.version, activationResponse, activationInterval)
- val completeMsg = CompletionMessage(transid, activationResult)
-
- producer.send("completed", completeMsg) map { status =>
- info(this, s"posted completion of activation ${msg.activationId}")
- }
-
- val contents = getContainerLogs(con, action.exec.sentinelledLogs, action.limits.logs)
-
- // Force delete the container instead of just pausing it iff the initialization failed or the container
- // failed otherwise. An example of a ContainerError is the timeout of an action in which case the
- // container is to be removed to prevent leaking
- pool.putBack(con, failedInit)
-
- completeTransaction(tran, activationResult withLogs ActivationLogs.serdes.read(contents), ContainerReleased(transid))
- }
-
- case None => { // this corresponds to the container not even starting - not /init failing
- info(this, s"failed to start or get a container")
- val response = if (action.exec.kind == BlackBoxExec) {
- ActivationResponse.containerError("the container did to start")
- } else {
- ActivationResponse.whiskError("error starting container to run action")
- }
- val interval = computeActivationInterval(tran)
- val activation = makeWhiskActivation(msg, action.name, action.version, response, interval)
- completeTransaction(tran, activation, FailedActivation(transid))
- }
- }
- }
-
- // The nodeJsAction runner inserts this line in the logs at the end
- // of each activation
- private val LogRetryCount = 15
- private val LogRetry = 100 // millis
- private val LOG_ACTIVATION_SENTINEL = "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX"
-
- /**
- * Waits for log cursor to advance. This will retry up to tries times
- * if the cursor has not yet advanced. This will penalize containers that
- * do not log. It is OK for nodejs containers because the runtime emits
- * the END_OF_ACTIVATION_MARKER automatically and that advances the cursor.
- *
- * Note: Updates the container's log cursor to indicate consumption of log.
- */
- private def getContainerLogs(con: WhiskContainer, sentinelled: Boolean, loglimit: LogLimit, tries: Int = LogRetryCount)(
- implicit transid: TransactionId): JsArray = {
- val size = pool.getLogSize(con, runningInContainer)
- val advanced = size != con.lastLogSize
- if (tries <= 0 || advanced) {
- val rawLogBytes = con.synchronized {
- pool.getDockerLogContent(con.containerId, con.lastLogSize, size, runningInContainer)
- }
- val rawLog = new String(rawLogBytes, "UTF-8")
-
- val (complete, isTruncated, logs) = processJsonDriverLogContents(rawLog, sentinelled, loglimit)
-
- if (tries > 0 && !complete && !isTruncated) {
- info(this, s"log cursor advanced but missing sentinel, trying $tries more times")
- Thread.sleep(LogRetry)
- getContainerLogs(con, sentinelled, loglimit, tries - 1)
- } else {
- con.lastLogSize = size
- val formattedLogs = logs.map(_.toFormattedString)
-
- val finishedLogs = if (isTruncated) {
- formattedLogs :+ loglimit.truncatedLogMessage
- } else formattedLogs
-
- JsArray(finishedLogs.map(_.toJson))
- }
- } else {
- info(this, s"log cursor has not advanced, trying $tries more times")
- Thread.sleep(LogRetry)
- getContainerLogs(con, sentinelled, loglimit, tries - 1)
- }
- }
-
- /**
- * Represents a single log line as read from a docker log
- */
- private case class LogLine(time: String, stream: String, log: String) {
- def toFormattedString = f"$time%-30s $stream: ${log.trim}"
- }
- private object LogLine extends DefaultJsonProtocol {
- implicit val serdes = jsonFormat3(LogLine.apply)
- }
-
- /**
- * Given the JSON driver's raw output of a docker container, convert it into our own
- * JSON format. If asked, check for sentinel markers (which are not included in the output).
- *
- * Only parses and returns so much logs to fit into the LogLimit passed.
- *
- * @param logMsgs raw String read from a JSON log-driver written file
- * @param requireSentinel determines if the processor should wait for a sentinel to appear
- * @param limit the limit to apply to the log size
- *
- * @return Tuple containing (isComplete, isTruncated, logs)
- */
- private def processJsonDriverLogContents(logMsgs: String, requireSentinel: Boolean, limit: LogLimit)(
- implicit transid: TransactionId): (Boolean, Boolean, Vector[LogLine]) = {
-
- if (logMsgs.nonEmpty) {
- val records = logMsgs.split("\n").toStream flatMap {
- line =>
- Try { line.parseJson.convertTo[LogLine] } match {
- case Success(t) => Some(t)
- case Failure(t) =>
- // Drop lines that did not parse to JSON objects.
- // However, should not happen since we are using the json log driver.
- error(this, s"log line skipped/did not parse: $t")
- None
- }
- }
-
- val cumulativeSizes = records.scanLeft(0.bytes) { (acc, current) => acc + current.log.sizeInBytes }.tail
- val truncatedLogs = records.zip(cumulativeSizes).takeWhile(_._2 < limit().megabytes).map(_._1).toVector
- val isTruncated = truncatedLogs.size < records.size
-
- if (isTruncated) {
- (true, true, truncatedLogs)
- } else if (requireSentinel) {
- val (sentinels, regulars) = truncatedLogs.partition(_.log.trim == LOG_ACTIVATION_SENTINEL)
- val hasOut = sentinels.exists(_.stream == "stdout")
- val hasErr = sentinels.exists(_.stream == "stderr")
- (hasOut && hasErr, false, regulars)
- } else {
- (true, false, truncatedLogs)
- }
- } else {
- warn(this, s"log message is empty")
- (!requireSentinel, false, Vector())
- }
- }
-
- private def incrementUserActivationCounter(user: Subject)(implicit transid: TransactionId): Int = {
- val counter = userActivationCounter.getOrElseUpdate(user(), new Counter())
- val count = counter.next()
- info(this, s"'${user}' has '$count' activations processed")
- count
- }
-
- private def getUserActivationCounts(): Map[String, JsObject] = {
- val subjects = userActivationCounter.keySet toList
- val groups = subjects.groupBy { user => user.substring(0, 1) } // Any sort of partitioning will be ok wrt load balancer
- groups.keySet map { prefix =>
- val key = InvokerKeys.userActivationCount(instance) + "/" + prefix
- val users = groups.getOrElse(prefix, Set())
- val items = users map { u => (u, JsNumber(userActivationCounter.get(u) map { c => c.cur } getOrElse 0)) }
- key -> JsObject(items toMap)
- } toMap
- }
-
- // -------------------------------------------------------------------------------------------------------------
-
- /**
- * Interprets the responses from the container and maps it to an appropriate ActivationResponse.
- */
- private def getActivationResponse(
- interval: Interval,
- timeout: Duration,
- response: Option[(Int, String)],
- failedInit: Boolean)(
- implicit transid: TransactionId): ActivationResponse = {
- if (interval.duration >= timeout) {
- ActivationResponse.applicationError(ActivationResponse.timedoutActivation(timeout, failedInit))
- } else if (!failedInit) {
- ActivationResponse.processRunResponseContent(response, this: Logging)
- } else {
- ActivationResponse.processInitResponseContent(response, this: Logging)
- }
- }
-
- /**
- * Creates a WhiskActivation for the given action, response and duration.
- */
- private def makeWhiskActivation(
- msg: Message,
- actionName: EntityName,
- actionVersion: SemVer,
- activationResponse: ActivationResponse,
- interval: Interval) = {
- WhiskActivation(
- namespace = msg.subject.namespace,
- name = actionName,
- version = actionVersion,
- publish = false,
- subject = msg.subject,
- activationId = msg.activationId,
- cause = msg.cause,
- start = interval.start,
- end = interval.end,
- response = activationResponse,
- logs = ActivationLogs())
- }
-
- /**
- * Reconstructs an interval based on the time spent in the various operations.
- * The goal is for the interval to have a duration corresponding to the sum of all durations
- * and an endtime corresponding to the latest endtime.
- *
- * @param transaction the transaction object containing metadata
- * @return interval for the transaction with start/end times computed
- */
- private def computeActivationInterval(transaction: Transaction): Interval = {
- (transaction.initInterval, transaction.runInterval) match {
- case (None, Some(run)) => run
- case (Some(init), None) => init
- case (None, None) => Interval(Instant.now(Clock.systemUTC()), Instant.now(Clock.systemUTC()))
- case (Some(init), Some(Interval(runStart, runEnd))) =>
- Interval(runStart.minusMillis(init.duration.toMillis), runEnd)
- }
- }
-
- private val entityStore = WhiskEntityStore.datastore(config)
- private val authStore = WhiskAuthStore.datastore(config)
- private val activationStore = WhiskActivationStore.datastore(config)
- private val pool = new ContainerPool(config, instance, verbosity)
- private val activationCounter = new Counter() // global activation counter
- private val userActivationCounter = new TrieMap[String, Counter]
-
- /**
- * Repeatedly updates the KV store as to the invoker's last check-in.
- */
- private val kv = new ConsulClient(config.consulServer)
- private val reporter = new ConsulKVReporter(kv, 3 seconds, 2 seconds,
- InvokerKeys.hostname(instance),
- InvokerKeys.start(instance),
- InvokerKeys.status(instance),
- { index =>
- (if (index % 5 == 0) getUserActivationCounts() else Map[String, JsValue]()) ++
- Map(InvokerKeys.activationCount(instance) -> activationCounter.cur.toJson)
- })
-
- setVerbosity(verbosity)
-}
-
-object Invoker {
- /**
- * An object which records the environment variables required for this component to run.
- */
- def requiredProperties = Map(
- servicePort -> 8080.toString(),
- logsDir -> null,
- dockerRegistry -> null,
- dockerImagePrefix -> null) ++
- WhiskAuthStore.requiredProperties ++
- WhiskEntityStore.requiredProperties ++
- WhiskActivationStore.requiredProperties ++
- ContainerPool.requiredProperties ++
- kafkaHost ++
- edgeHost ++
- consulServer ++
- whiskVersion
-}
-
-object InvokerService {
- /**
- * An object which records the environment variables required for this component to run.
- */
- def requiredProperties = Invoker.requiredProperties
-
- def main(args: Array[String]): Unit = {
- implicit val ec = ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
- implicit val system: ActorSystem = ActorSystem(
- name = "invoker-actor-system",
- defaultExecutionContext = Some(ec))
-
- // load values for the required properties from the environment
- val config = new WhiskConfig(requiredProperties)
-
- if (config.isValid) {
- val instance = if (args.length > 0) args(1).toInt else 0
- val verbosity = InfoLevel
-
- SimpleExec.setVerbosity(verbosity)
-
- val topic = s"invoke${instance}"
- val groupid = "invokers"
- val maxdepth = ContainerPool.getDefaultMaxActive(config)
- val consumer = new KafkaConsumerConnector(config.kafkaHost, groupid, topic, maxdepth)
- val dispatcher = new Dispatcher(verbosity, consumer, 500 milliseconds, 2 * maxdepth, system)
-
- val invoker = new Invoker(config, instance, dispatcher.activationFeed, verbosity)
- dispatcher.addHandler(invoker, true)
- dispatcher.start()
-
- val port = config.servicePort.toInt
- BasicHttpService.startService(system, "invoker", "0.0.0.0", port, new Creator[InvokerServer] {
- def create = new InvokerServer {}
- })
- }
- }
-}
diff --git a/core/dispatcher/src/main/scala/whisk/core/invoker/InvokerServer.scala b/core/dispatcher/src/main/scala/whisk/core/invoker/InvokerServer.scala
deleted file mode 100644
index 2d06618..0000000
--- a/core/dispatcher/src/main/scala/whisk/core/invoker/InvokerServer.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Copyright 2015-2016 IBM Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.invoker
-
-import akka.actor.Actor
-import whisk.common.Logging
-import whisk.http.BasicRasService
-
-/**
- * Implements web server to handle certain REST API calls.
- * Currently provides a health ping route, only.
- */
-trait InvokerServer
- extends BasicRasService
- with Actor
- with Logging {
-
- override def actorRefFactory = context
-}
diff --git a/settings.gradle b/settings.gradle
index fbc9781..c9ad341 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -1,7 +1,7 @@
include 'common:scala'
include 'core:controller'
-include 'core:dispatcher'
+include 'core:invoker'
include 'core:nodejsActionBase'
include 'core:nodejsAction'
include 'core:nodejs6Action'
diff --git a/tests/build.gradle b/tests/build.gradle
index 2f0da6f..6c0a7ba 100644
--- a/tests/build.gradle
+++ b/tests/build.gradle
@@ -62,7 +62,7 @@
testCompile project(':common:scala')
testCompile project(':core:controller')
- testCompile project(':core:dispatcher')
+ testCompile project(':core:invoker')
}
tasks.withType(ScalaCompile) {