blob: 8292eba1eac40d139ac1e1e159081dff13941390 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.openwhisk.core.containerpool.kubernetes
import akka.actor.ActorSystem
import pureconfig._
import pureconfig.generic.auto._
import scala.concurrent.Await
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration._
import org.apache.openwhisk.common.Logging
import org.apache.openwhisk.common.TransactionId
import org.apache.openwhisk.core.containerpool.{
Container,
ContainerArgsConfig,
ContainerFactory,
ContainerFactoryProvider,
RuntimesRegistryConfig
}
import org.apache.openwhisk.core.entity.ByteSize
import org.apache.openwhisk.core.entity.ExecManifest.ImageName
import org.apache.openwhisk.core.entity.InvokerInstanceId
import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
class KubernetesContainerFactory(
label: String,
config: WhiskConfig,
containerArgsConfig: ContainerArgsConfig = loadConfigOrThrow[ContainerArgsConfig](ConfigKeys.containerArgs),
runtimesRegistryConfig: RuntimesRegistryConfig =
loadConfigOrThrow[RuntimesRegistryConfig](ConfigKeys.runtimesRegistry),
userImagesRegistryConfig: RuntimesRegistryConfig = loadConfigOrThrow[RuntimesRegistryConfig](
ConfigKeys.userImagesRegistry))(implicit actorSystem: ActorSystem, ec: ExecutionContext, logging: Logging)
extends ContainerFactory {
implicit val kubernetes = initializeKubeClient()
private def initializeKubeClient(): KubernetesClient = {
val config = loadConfigOrThrow[KubernetesClientConfig](ConfigKeys.kubernetes)
new KubernetesClient(config)(ec)
}
/** Perform cleanup on init */
override def init(): Unit = cleanup()
override def cleanup() = {
logging.info(this, "Cleaning up function runtimes")
val labels = Map("invoker" -> label, "release" -> KubernetesContainerFactoryProvider.release)
val cleaning = kubernetes.rm(labels, true)(TransactionId.invokerNanny)
Await.ready(cleaning, KubernetesContainerFactoryProvider.runtimeDeleteTimeout)
}
override def createContainer(tid: TransactionId,
name: String,
actionImage: ImageName,
userProvidedImage: Boolean,
memory: ByteSize,
cpuShares: Int)(implicit config: WhiskConfig, logging: Logging): Future[Container] = {
val image = actionImage.resolveImageName(Some(
ContainerFactory.resolveRegistryConfig(userProvidedImage, runtimesRegistryConfig, userImagesRegistryConfig).url))
KubernetesContainer.create(
tid,
name,
image,
userProvidedImage,
memory,
environment = Map("__OW_API_HOST" -> config.wskApiHost) ++ containerArgsConfig.extraEnvVarMap,
labels = Map("invoker" -> label, "release" -> KubernetesContainerFactoryProvider.release))
}
}
object KubernetesContainerFactoryProvider extends ContainerFactoryProvider {
val release = loadConfigOrThrow[String]("whisk.helm.release")
val runtimeDeleteTimeout = loadConfigOrThrow[FiniteDuration]("whisk.runtime.delete.timeout")
override def instance(actorSystem: ActorSystem,
logging: Logging,
config: WhiskConfig,
instance: InvokerInstanceId,
parameters: Map[String, Set[String]]): ContainerFactory =
new KubernetesContainerFactory(s"invoker${instance.toInt}", config)(actorSystem, actorSystem.dispatcher, logging)
}