blob: 437beb314fa4f3045127b1c21d6e1944f254692a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.openwhisk.core.invoker
import akka.Done
import akka.actor.{ActorSystem, CoordinatedShutdown}
import akka.http.scaladsl.server.Route
import com.typesafe.config.ConfigValueFactory
import kamon.Kamon
import org.apache.openwhisk.common.Https.HttpsConfig
import org.apache.openwhisk.common._
import org.apache.openwhisk.core.WhiskConfig._
import org.apache.openwhisk.core.connector.{MessageProducer, MessagingProvider}
import org.apache.openwhisk.core.containerpool.{Container, ContainerPoolConfig}
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
import org.apache.openwhisk.http.{BasicHttpService, BasicRasService}
import org.apache.openwhisk.spi.{Spi, SpiLoader}
import org.apache.openwhisk.utils.ExecutionContextFactory
import pureconfig._
import pureconfig.generic.auto._
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.util.Try
case class CmdLineArgs(uniqueName: Option[String] = None,
id: Option[Int] = None,
displayedName: Option[String] = None,
overwriteId: Option[Int] = None)
object Invoker {
/**
* Collect logs after the activation has finished.
*
* This method is called after an activation has finished. The logs gathered here are stored along the activation
* record in the database.
*
* @param transid transaction the activation ran in
* @param user the user who ran the activation
* @param activation the activation record
* @param container container used by the activation
* @param action action that was activated
* @return logs for the given activation
*/
trait LogsCollector {
def logsToBeCollected(action: ExecutableWhiskAction): Boolean = action.limits.logs.asMegaBytes != 0.MB
def apply(transid: TransactionId,
user: Identity,
activation: WhiskActivation,
container: Container,
action: ExecutableWhiskAction): Future[ActivationLogs]
}
protected val protocol = loadConfigOrThrow[String]("whisk.invoker.protocol")
val topicPrefix = loadConfigOrThrow[String](ConfigKeys.kafkaTopicsPrefix)
/**
* An object which records the environment variables required for this component to run.
*/
def requiredProperties =
Map(servicePort -> 8080.toString) ++
ExecManifest.requiredProperties ++
kafkaHosts ++
zookeeperHosts ++
wskApiHost
def initKamon(instance: Int): Unit = {
// Replace the hostname of the invoker to the assigned id of the invoker.
val newKamonConfig = Kamon.config
.withValue("kamon.environment.host", ConfigValueFactory.fromAnyRef(s"invoker$instance"))
Kamon.init(newKamonConfig)
}
def main(args: Array[String]): Unit = {
ConfigMXBean.register()
implicit val ec = ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
implicit val actorSystem: ActorSystem =
ActorSystem(name = "invoker-actor-system", defaultExecutionContext = Some(ec))
implicit val logger = new AkkaLogging(akka.event.Logging.getLogger(actorSystem, this))
val poolConfig: ContainerPoolConfig = loadConfigOrThrow[ContainerPoolConfig](ConfigKeys.containerPool)
val limitConfig: ConcurrencyLimitConfig = loadConfigOrThrow[ConcurrencyLimitConfig](ConfigKeys.concurrencyLimit)
// Prepare Kamon shutdown
CoordinatedShutdown(actorSystem).addTask(CoordinatedShutdown.PhaseActorSystemTerminate, "shutdownKamon") { () =>
logger.info(this, s"Shutting down Kamon with coordinated shutdown")
Kamon.stopModules().map(_ => Done)
}
// load values for the required properties from the environment
implicit val config = new WhiskConfig(requiredProperties)
def abort(message: String) = {
logger.error(this, message)(TransactionId.invoker)
actorSystem.terminate()
Await.result(actorSystem.whenTerminated, 30.seconds)
sys.exit(1)
}
if (!config.isValid) {
abort("Bad configuration, cannot start.")
}
val execManifest = ExecManifest.initialize(config)
if (execManifest.isFailure) {
logger.error(this, s"Invalid runtimes manifest: ${execManifest.failed.get}")
abort("Bad configuration, cannot start.")
}
/** Returns Some(s) if the string is not empty with trimmed whitespace, None otherwise. */
def nonEmptyString(s: String): Option[String] = {
val trimmed = s.trim
if (trimmed.nonEmpty) Some(trimmed) else None
}
// process command line arguments
// We accept the command line grammar of:
// Usage: invoker [options] [<proposedInvokerId>]
// --uniqueName <value> a unique name to dynamically assign Kafka topics from Zookeeper
// --displayedName <value> a name to identify this invoker via invoker health protocol
// --id <value> proposed invokerId
// --overwriteId <value> proposed invokerId to re-write with uniqueName in Zookeeper,
// DO NOT USE overwriteId unless sure invokerId does not exist for other uniqueName
def parse(ls: List[String], c: CmdLineArgs): CmdLineArgs = {
ls match {
case "--uniqueName" :: uniqueName :: tail =>
parse(tail, c.copy(uniqueName = nonEmptyString(uniqueName)))
case "--displayedName" :: displayedName :: tail =>
parse(tail, c.copy(displayedName = nonEmptyString(displayedName)))
case "--id" :: id :: tail if Try(id.toInt).isSuccess =>
parse(tail, c.copy(id = Some(id.toInt)))
case "--overwriteId" :: overwriteId :: tail if Try(overwriteId.toInt).isSuccess =>
parse(tail, c.copy(overwriteId = Some(overwriteId.toInt)))
case Nil => c
case _ => abort(s"Error processing command line arguments $ls")
}
}
val cmdLineArgs = parse(args.toList, CmdLineArgs())
logger.info(this, "Command line arguments parsed to yield " + cmdLineArgs)
val assignedInvokerId = cmdLineArgs match {
// --id is defined with a valid value, use this id directly.
case CmdLineArgs(_, Some(id), _, _) =>
logger.info(this, s"invokerReg: using proposedInvokerId $id")
id
// --uniqueName is defined with a valid value, id is empty, assign an id via zookeeper
case CmdLineArgs(Some(unique), None, _, overwriteId) =>
if (config.zookeeperHosts.startsWith(":") || config.zookeeperHosts.endsWith(":")) {
abort(s"Must provide valid zookeeper host and port to use dynamicId assignment (${config.zookeeperHosts})")
}
new InstanceIdAssigner(config.zookeeperHosts).setAndGetId(unique, overwriteId)
case _ => abort(s"Either --id or --uniqueName must be configured with correct values")
}
initKamon(assignedInvokerId)
val topicBaseName = "invoker"
val topicName = topicPrefix + topicBaseName + assignedInvokerId
val maxMessageBytes = Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)
val invokerInstance =
InvokerInstanceId(assignedInvokerId, cmdLineArgs.uniqueName, cmdLineArgs.displayedName, poolConfig.userMemory)
val msgProvider = SpiLoader.get[MessagingProvider]
if (msgProvider
.ensureTopic(config, topic = topicName, topicConfig = topicBaseName, maxMessageBytes = maxMessageBytes)
.isFailure) {
abort(s"failure during msgProvider.ensureTopic for topic $topicName")
}
val producer = msgProvider.getProducer(config, Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))
val invoker = try {
SpiLoader.get[InvokerProvider].instance(config, invokerInstance, producer, poolConfig, limitConfig)
} catch {
case e: Exception => abort(s"Failed to initialize reactive invoker: ${e.getMessage}")
}
val port = config.servicePort.toInt
val httpsConfig =
if (Invoker.protocol == "https") Some(loadConfigOrThrow[HttpsConfig]("whisk.invoker.https")) else None
val invokerServer = SpiLoader.get[InvokerServerProvider].instance(invoker)
BasicHttpService.startHttpService(invokerServer.route, port, httpsConfig)(actorSystem)
}
}
/**
* An Spi for providing invoker implementation.
*/
trait InvokerProvider extends Spi {
def instance(config: WhiskConfig,
instance: InvokerInstanceId,
producer: MessageProducer,
poolConfig: ContainerPoolConfig,
limitsConfig: ConcurrencyLimitConfig)(implicit actorSystem: ActorSystem, logging: Logging): InvokerCore
}
// this trait can be used to add common implementation
trait InvokerCore {
def enable(): Route
def disable(): Route
}
/**
* An Spi for providing RestAPI implementation for invoker.
* The given invoker may require corresponding RestAPI implementation.
*/
trait InvokerServerProvider extends Spi {
def instance(
invoker: InvokerCore)(implicit ec: ExecutionContext, actorSystem: ActorSystem, logger: Logging): BasicRasService
}