package org.apache.openwhisk.core.containerpool.kubernetes
import java.time.format.DateTimeFormatterBuilder
import java.time.{Instant, ZoneId}
import akka.http.scaladsl.model.Uri
import akka.http.scaladsl.model.Uri.{Path, Query}
import{ActorMaterializer, Attributes, Outlet, SourceShape}
import akka.util.ByteString
import io.fabric8.kubernetes.api.model._
import io.fabric8.kubernetes.client.utils.Serialization
import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient}
import okhttp3.{Call, Callback, Request, Response}
import okio.BufferedSource
import org.apache.openwhisk.common.{ConfigMapValue, Logging, TransactionId}
import org.apache.openwhisk.core.ConfigKeys
import org.apache.openwhisk.core.containerpool.docker.ProcessRunner
import org.apache.openwhisk.core.containerpool.{ContainerAddress, ContainerId}
import org.apache.openwhisk.core.entity.ByteSize
import org.apache.openwhisk.core.entity.size._
import pureconfig.loadConfigOrThrow
import spray.json.DefaultJsonProtocol._
import spray.json._
import scala.annotation.tailrec
import scala.collection.mutable
import scala.concurrent.duration._
import scala.concurrent.{blocking, ExecutionContext, Future}
import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}
* Configuration for kubernetes client command timeouts.
case class KubernetesClientTimeoutConfig(run: Duration, logs: Duration)
* Configuration for kubernetes invoker-agent
case class KubernetesInvokerAgentConfig(enabled: Boolean, port: Int)
* Configuration for node affinity for the pods that execute user action containers
* The key,value pair should match the <key,value> pair with which the invoker worker nodes
* are labeled in the Kubernetes cluster. The default pair is <openwhisk-role,invoker>,
* but a deployment may override this default if needed.
case class KubernetesInvokerNodeAffinity(enabled: Boolean, key: String, value: String)
* General configuration for kubernetes client
case class KubernetesClientConfig(timeouts: KubernetesClientTimeoutConfig,
invokerAgent: KubernetesInvokerAgentConfig,
userPodNodeAffinity: KubernetesInvokerNodeAffinity,
portForwardingEnabled: Boolean,
podTemplate: Option[ConfigMapValue] = None)
* Serves as an interface to the Kubernetes API by proxying its REST API and/or invoking the kubectl CLI.
* Be cautious with the ExecutionContext passed to this, as many
* operations are blocking.
* You only need one instance (and you shouldn't get more).
class KubernetesClient(
config: KubernetesClientConfig = loadConfigOrThrow[KubernetesClientConfig](ConfigKeys.kubernetes))(
executionContext: ExecutionContext)(implicit log: Logging, as: ActorSystem)
extends KubernetesApi
with ProcessRunner {
implicit protected val ec = executionContext
implicit protected val am = ActorMaterializer()
implicit protected val kubeRestClient = new DefaultKubernetesClient(
new ConfigBuilder()
private val podBuilder = new WhiskPodBuilder(kubeRestClient, config.userPodNodeAffinity, config.podTemplate)
def run(name: String,
image: String,
memory: ByteSize = 256.MB,
environment: Map[String, String] = Map.empty,
labels: Map[String, String] = Map.empty)(implicit transid: TransactionId): Future[KubernetesContainer] = {
val pod = podBuilder.buildPodSpec(name, image, memory, environment, labels)
if (transid.meta.extraLogging) {, s"Pod spec being created\n${Serialization.asYaml(pod)}")
val namespace = kubeRestClient.getNamespace
Future {
blocking {
val createdPod = kubeRestClient.pods
}.recoverWith {
case e =>
log.error(this, s"Failed create pod for '$name': ${e.getClass} - ${e.getMessage}")
Future.failed(new Exception(s"Failed to create pod '$name'"))
def rm(container: KubernetesContainer)(implicit transid: TransactionId): Future[Unit] = {
Future {
blocking {
}.map(_ => ())
def rm(podName: String): Future[Unit] = {
Future {
blocking {
}.map(_ => ())
def rm(key: String, value: String, ensureUnpaused: Boolean = false)(implicit transid: TransactionId): Future[Unit] = {
Future {
blocking {
.withLabel(key, value)
}.map(_ => ())
// suspend is a no-op with the basic KubernetesClient
def suspend(container: KubernetesContainer)(implicit transid: TransactionId): Future[Unit] = Future.successful({})
// resume is a no-op with the basic KubernetesClient
def resume(container: KubernetesContainer)(implicit transid: TransactionId): Future[Unit] = Future.successful({})
def logs(container: KubernetesContainer, sinceTime: Option[Instant], waitForSentinel: Boolean = false)(
implicit transid: TransactionId): Source[TypedLogLine, Any] = {
log.debug(this, "Parsing logs from Kubernetes Graph Stage…")
.fromGraph(new KubernetesRestLogSourceStage(, sinceTime, waitForSentinel))
protected def toContainer(pod: Pod): KubernetesContainer = {
val id = ContainerId(pod.getMetadata.getName)
val portFwd = if (config.portForwardingEnabled) {
} else None
val addr = portFwd
.map(fwd => ContainerAddress("localhost", fwd.getLocalPort))
val workerIP = pod.getStatus.getHostIP
// Extract the native (docker or containerd) containerId for the container
// By convention, kubernetes adds a docker:// prefix when using docker as the low-level container engine
val nativeContainerId = pod.getStatus.getContainerStatuses.get(0).getContainerID.stripPrefix("docker://")
implicit val kubernetes = this
new KubernetesContainer(id, addr, workerIP, nativeContainerId, portFwd)
object KubernetesClient {
// Necessary, as Kubernetes uses nanosecond precision in logs, but java.time.Instant toString uses milliseconds
val K8STimestampFormat = new DateTimeFormatterBuilder()
def parseK8STimestamp(ts: String): Try[Instant] =
def formatK8STimestamp(ts: Instant): Try[String] =
trait KubernetesApi {
def run(name: String,
image: String,
memory: ByteSize,
environment: Map[String, String] = Map.empty,
labels: Map[String, String] = Map.empty)(implicit transid: TransactionId): Future[KubernetesContainer]
def rm(container: KubernetesContainer)(implicit transid: TransactionId): Future[Unit]
def rm(podName: String): Future[Unit]
def rm(key: String, value: String, ensureUnpaused: Boolean)(implicit transid: TransactionId): Future[Unit]
def suspend(container: KubernetesContainer)(implicit transid: TransactionId): Future[Unit]
def resume(container: KubernetesContainer)(implicit transid: TransactionId): Future[Unit]
def logs(container: KubernetesContainer, sinceTime: Option[Instant], waitForSentinel: Boolean = false)(
implicit transid: TransactionId): Source[TypedLogLine, Any]
object KubernetesRestLogSourceStage {
import KubernetesClient.{formatK8STimestamp, parseK8STimestamp}
val retryDelay = 100.milliseconds
sealed trait K8SRestLogTimingEvent
case object K8SRestLogRetry extends K8SRestLogTimingEvent
def constructPath(namespace: String, containerId: String): Path =
Path / "api" / "v1" / "namespaces" / namespace / "pods" / containerId / "log"
def constructQuery(sinceTime: Option[Instant], waitForSentinel: Boolean): Query = {
val sinceTimestamp = sinceTime.flatMap(time => formatK8STimestamp(time).toOption)
Query(Map("timestamps" -> "true") ++ => "sinceTime" -> time))
def readLines(src: BufferedSource,
lastTimestamp: Option[Instant],
lines: Seq[TypedLogLine] = Seq.empty[TypedLogLine]): Seq[TypedLogLine] = {
if (!src.exhausted()) {
(for {
line <- Option(src.readUtf8Line()) if !line.isEmpty
timestampDelimiter = line.indexOf(" ")
// Kubernetes is ignoring nanoseconds in sinceTime, so we have to filter additionally here
rawTimestamp = line.substring(0, timestampDelimiter)
timestamp <- parseK8STimestamp(rawTimestamp).toOption if isRelevantLogLine(lastTimestamp, timestamp)
msg = line.substring(timestampDelimiter + 1)
stream = "stdout" // TODO - when we can distinguish stderr:
} yield {
TypedLogLine(timestamp, stream, msg)
}) match {
case Some(logLine) =>
readLines(src, Option(logLine.time), lines :+ logLine)
case None =>
// we may have skipped a line for filtering conditions only; keep going
readLines(src, lastTimestamp, lines)
} else {
def isRelevantLogLine(lastTimestamp: Option[Instant], newTimestamp: Instant): Boolean =
lastTimestamp match {
case Some(last) =>
case None =>
final class KubernetesRestLogSourceStage(id: ContainerId, sinceTime: Option[Instant], waitForSentinel: Boolean)(
implicit val kubeRestClient: DefaultKubernetesClient)
extends GraphStage[SourceShape[TypedLogLine]] { stage =>
import KubernetesRestLogSourceStage._
val out = Outlet[TypedLogLine]("K8SHttpLogging.out")
override val shape: SourceShape[TypedLogLine] = SourceShape.of(out)
override protected def initialAttributes: Attributes ="KubernetesHttpLogSource")
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new TimerGraphStageLogicWithLogging(shape) { logic =>
private val queue = mutable.Queue.empty[TypedLogLine]
private var lastTimestamp = sinceTime
def fetchLogs(): Unit =
try {
val path = constructPath(kubeRestClient.getNamespace, id.asString)
val query = constructQuery(lastTimestamp, waitForSentinel)
log.debug("*** Fetching K8S HTTP Logs w/ Path: {} Query: {}", path, query)
val url = Uri(kubeRestClient.getMasterUrl.toString)
val request = new Request.Builder().get().url(url.toString).build
kubeRestClient.getHttpClient.newCall(request).enqueue(new LogFetchCallback())
} catch {
case NonFatal(e) =>
throw e
def onFailure(e: Throwable): Unit = e match {
case _: SocketTimeoutException =>
log.warning("* Logging socket to Kubernetes timed out.") // this should only happen with follow behavior
case _ =>
log.error(e, "* Retrieving the logs from Kubernetes failed.")
val emitCallback: AsyncCallback[Seq[TypedLogLine]] = getAsyncCallback[Seq[TypedLogLine]] {
case lines @ firstLine +: restOfLines =>
if (isAvailable(out)) {
log.debug("* Lines Available & output ready; pushing {} (remaining: {})", firstLine, restOfLines)
queue ++= restOfLines
} else {
log.debug("* Output isn't ready; queueing lines: {}", lines)
queue ++= lines
case Nil =>
log.debug("* Empty lines returned.")
class LogFetchCallback extends Callback {
override def onFailure(call: Call, e: IOException): Unit = logic.onFailure(e)
override def onResponse(call: Call, response: Response): Unit =
try {
val lines = readLines(response.body.source, lastTimestamp)
log.debug("* Read & decoded lines for K8S HTTP: {}", lines)
lines.lastOption.foreach { line =>
log.debug("* Updating lastTimestamp (sinceTime) to {}", Option(line.time))
lastTimestamp = Option(line.time)
} catch {
case NonFatal(e) =>
log.error(e, "* Reading Kubernetes HTTP Response failed.")
throw e
def pushLine(line: TypedLogLine): Unit = {
log.debug("* Pushing a chunk of kubernetes logging: {}", line)
push(out, line)
new OutHandler {
override def onPull(): Unit = {
// if we still have lines queued up, return those; else make a new HTTP read.
if (queue.nonEmpty) {
log.debug("* onPull, nonEmpty queue... pushing line")
} else {
log.debug("* onPull, empty queue... fetching logs")
def retryLogs(): Unit = {
// Pause before retrying so we don't thrash Kubernetes w/ HTTP requests
log.debug("* Scheduling a retry of log fetch in {}", retryDelay)
scheduleOnce(K8SRestLogRetry, retryDelay)
override protected def onTimer(timerKey: Any): Unit = timerKey match {
case K8SRestLogRetry =>
log.debug("* Timer trigger for log fetch retry")
case x =>
log.warning("* Got a timer trigger with an unknown key: {}", x)
protected[core] final case class TypedLogLine(time: Instant, stream: String, log: String) {
import KubernetesClient.formatK8STimestamp
lazy val toJson: JsObject =
JsObject("time" -> formatK8STimestamp(time).getOrElse("").toJson, "stream" -> stream.toJson, "log" -> log.toJson)
lazy val jsonPrinted: String = toJson.compactPrint
lazy val jsonSize: Int = jsonPrinted.length
* Returns a ByteString representation of the json for this Log Line
val toByteString = ByteString(jsonPrinted)
override def toString = s"${formatK8STimestamp(time).get} $stream: ${log.trim}"
protected[core] object TypedLogLine {
import KubernetesClient.{parseK8STimestamp, K8STimestampFormat}
def readInstant(json: JsValue): Instant = json match {
case JsString(str) =>
parseK8STimestamp(str) match {
case Success(time) =>
case Failure(e) =>
s"Could not parse a java.time.Instant from $str (Expected in format: $K8STimestampFormat: $e")
case _ =>
deserializationError(s"Could not parse a java.time.Instant from $json (Expected in format: $K8STimestampFormat)")
implicit val typedLogLineFormat = new RootJsonFormat[TypedLogLine] {
override def write(obj: TypedLogLine): JsValue = obj.toJson
override def read(json: JsValue): TypedLogLine = {
val obj = json.asJsObject
val fields = obj.fields
TypedLogLine(readInstant(fields("time")), fields("stream").convertTo[String], fields("log").convertTo[String])