blob: 3b5a6c4ca42ba81c8546792900f26e4e35ed9dc0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.openwhisk.core.containerpool
import akka.actor.ActorSystem
import org.apache.openwhisk.common.{Logging, TransactionId}
import org.apache.openwhisk.core.WhiskConfig
import org.apache.openwhisk.core.entity.{ByteSize, ExecManifest, ExecutableWhiskAction, InvokerInstanceId}
import org.apache.openwhisk.spi.Spi
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import scala.math.max
case class ContainerArgsConfig(network: String,
dnsServers: Seq[String] = Seq.empty,
dnsSearch: Seq[String] = Seq.empty,
dnsOptions: Seq[String] = Seq.empty,
extraEnvVars: Seq[String] = Seq.empty,
extraArgs: Map[String, Set[String]] = Map.empty) {
val extraEnvVarMap: Map[String, String] =
extraEnvVars.flatMap {
_.split("=", 2) match {
case Array(key) => Some(key -> "")
case Array(key, value) => Some(key -> value)
case _ => None
}
}.toMap
}
case class ContainerPoolConfig(userMemory: ByteSize,
concurrentPeekFactor: Double,
akkaClient: Boolean,
prewarmExpirationCheckInitDelay: FiniteDuration,
prewarmExpirationCheckInterval: FiniteDuration,
prewarmExpirationCheckIntervalVariance: Option[FiniteDuration],
prewarmExpirationLimit: Int,
prewarmMaxRetryLimit: Int,
prewarmPromotion: Boolean,
memorySyncInterval: FiniteDuration,
prewarmContainerCreationConfig: Option[PrewarmContainerCreationConfig] = None) {
require(
concurrentPeekFactor > 0 && concurrentPeekFactor <= 1.0,
s"concurrentPeekFactor must be > 0 and <= 1.0; was $concurrentPeekFactor")
require(prewarmExpirationCheckInterval.toSeconds > 0, "prewarmExpirationCheckInterval must be > 0")
/**
* The shareFactor indicates the number of containers that would share a single core, on average.
* cpuShare is a docker option (-c) whereby a container's CPU access is limited.
* A value of 1024 is the full share so a strict resource division with a shareFactor of 2 would yield 512.
* On an idle/underloaded system, a container will still get to use underutilized CPU shares.
*/
private val totalShare = 1024.0 // This is a pre-defined value coming from docker and not our hard-coded value.
// Grant more CPU to a container if it allocates more memory.
def cpuShare(reservedMemory: ByteSize) =
max((totalShare / (userMemory.toBytes / reservedMemory.toBytes)).toInt, 2) // The minimum allowed cpu-shares is 2
}
case class PrewarmContainerCreationConfig(maxConcurrent: Int, creationDelay: FiniteDuration) {
require(maxConcurrent > 0, "maxConcurrent for per invoker must be > 0")
require(creationDelay.toSeconds > 0, "creationDelay must be > 0")
}
case class RuntimesRegistryCredentials(user: String, password: String)
case class RuntimesRegistryConfig(url: String, credentials: Option[RuntimesRegistryCredentials])
/**
* An abstraction for Container creation
*/
trait ContainerFactory {
/**
* Create a new Container
*
* The created container has to satisfy following requirements:
* - The container's file system is based on the provided action image and may have a read/write layer on top.
* Some managed action runtimes may need the capability to write files.
* - If the specified image is not available on the system, it is pulled from an image
* repository - for example, Docker Hub.
* - The container needs a network setup - usually, a network interface - such that the invoker is able
* to connect the action container. The container must be able to perform DNS resolution based
* on the settings provided via ContainerArgsConfig. If needed by action authors,
* the container should be able to connect to other systems or even the internet to consume services.
* - The IP address of said interface is stored in the created Container instance if you want to use
* the standard init / run behaviour defined in the Container trait.
* - The default process specified in the action image is run.
* - It is desired that all stdout / stderr written by processes in the container is captured such
* that it can be obtained using the logs() method of the Container trait.
* - It is desired that the container supports and enforces the specified memory limit and CPU shares.
* In particular, action memory limits rely on the underlying container technology.
*/
def createContainer(
tid: TransactionId,
name: String,
actionImage: ExecManifest.ImageName,
userProvidedImage: Boolean,
memory: ByteSize,
cpuShares: Int,
action: Option[ExecutableWhiskAction])(implicit config: WhiskConfig, logging: Logging): Future[Container] = {
createContainer(tid, name, actionImage, userProvidedImage, memory, cpuShares)
}
def createContainer(tid: TransactionId,
name: String,
actionImage: ExecManifest.ImageName,
userProvidedImage: Boolean,
memory: ByteSize,
cpuShares: Int)(implicit config: WhiskConfig, logging: Logging): Future[Container]
/** perform any initialization */
def init(): Unit
/** cleanup any remaining Containers; should block until complete; should ONLY be run at startup/shutdown */
def cleanup(): Unit
}
object ContainerFactory {
/** based on https://github.com/moby/moby/issues/3138 and https://github.com/moby/moby/blob/master/daemon/names/names.go */
private def isAllowed(c: Char) = c.isLetterOrDigit || c == '_' || c == '.' || c == '-'
/** include the instance name, if specified and strip invalid chars before attempting to use them in the container name */
def containerNamePrefix(instanceId: InvokerInstanceId): String =
s"wsk${instanceId.uniqueName.getOrElse("")}${instanceId.toInt}".filter(isAllowed)
def resolveRegistryConfig(userProvidedImage: Boolean,
runtimesRegistryConfig: RuntimesRegistryConfig,
userImagesRegistryConfig: RuntimesRegistryConfig): RuntimesRegistryConfig = {
if (userProvidedImage) userImagesRegistryConfig else runtimesRegistryConfig
}
}
/**
* An SPI for ContainerFactory creation
* All impls should use the parameters specified as additional args to "docker run" commands
*/
trait ContainerFactoryProvider extends Spi {
def instance(actorSystem: ActorSystem,
logging: Logging,
config: WhiskConfig,
instance: InvokerInstanceId,
parameters: Map[String, Set[String]]): ContainerFactory
}