| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.openwhisk.core.scheduler |
| |
| import akka.Done |
| import akka.actor.{ActorRefFactory, ActorSelection, ActorSystem, CoordinatedShutdown} |
| import akka.stream.ActorMaterializer |
| import akka.util.Timeout |
| import com.typesafe.config.ConfigValueFactory |
| import kamon.Kamon |
| import org.apache.openwhisk.common.Https.HttpsConfig |
| import org.apache.openwhisk.common._ |
| import org.apache.openwhisk.core.WhiskConfig |
| import org.apache.openwhisk.core.WhiskConfig.{servicePort, _} |
| import org.apache.openwhisk.core.ack.{MessagingActiveAck, UserEventSender} |
| import org.apache.openwhisk.core.connector._ |
| import org.apache.openwhisk.core.database.{ActivationStoreProvider, NoDocumentException, UserContext} |
| import org.apache.openwhisk.core.entity._ |
| import org.apache.openwhisk.http.BasicHttpService |
| import org.apache.openwhisk.spi.SpiLoader |
| import org.apache.openwhisk.utils.ExecutionContextFactory |
| import pureconfig.loadConfigOrThrow |
| import spray.json.{DefaultJsonProtocol, _} |
| |
| import scala.concurrent.duration._ |
| import scala.concurrent.{Await, Future} |
| import scala.language.postfixOps |
| import scala.util.{Failure, Success, Try} |
| import pureconfig.generic.auto._ |
| |
| class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerEndpoints)( |
| implicit config: WhiskConfig, |
| actorSystem: ActorSystem, |
| materializer: ActorMaterializer, |
| logging: Logging) |
| extends SchedulerCore { |
| implicit val ec = actorSystem.dispatcher |
| private val authStore = WhiskAuthStore.datastore() |
| |
| val msgProvider = SpiLoader.get[MessagingProvider] |
| val producer = msgProvider.getProducer(config, Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)) |
| |
| val maxPeek = "" // TODO: TBD |
| val etcdClient = "" // TODO: TBD |
| val watcherService = "" // TODO: TBD |
| val leaseService = "" // TODO: TBD |
| |
| implicit val entityStore = WhiskEntityStore.datastore() |
| private val activationStore = |
| SpiLoader.get[ActivationStoreProvider].instance(actorSystem, materializer, logging) |
| |
| private val ack = { |
| val sender = if (UserEvents.enabled) Some(new UserEventSender(producer)) else None |
| new MessagingActiveAck(producer, schedulerId, sender) |
| } |
| |
| /** Stores an activation in the database. */ |
| private val store = (tid: TransactionId, activation: WhiskActivation, context: UserContext) => { |
| implicit val transid: TransactionId = tid |
| activationStore.store(activation, context)(tid, notifier = None).andThen { |
| case Success(doc) => logging.info(this, s"save ${doc} successfully") |
| case Failure(t) => logging.error(this, s"failed to save activation $activation, error: ${t.getMessage}") |
| } |
| } |
| val durationCheckerProvider = "" // TODO: TBD |
| val durationChecker = "" // TODO: TBD |
| |
| override def getState: Future[(List[(SchedulerInstanceId, Int)], Int)] = { |
| Future.successful((List((schedulerId, 0)), 0)) // TODO: TBD, after etcdClient is ready, can implement it |
| } |
| |
| override def getQueueSize: Future[Int] = { |
| Future.successful(0) // TODO: TBD, after queueManager is ready, can implement it |
| } |
| |
| override def getQueueStatusData: Future[List[StatusData]] = { |
| Future.successful(List(StatusData("ns", "fqn", 0, "Running", "data"))) // TODO: TBD, after queueManager is ready, can implement it |
| } |
| |
| override def disable(): Unit = { |
| logging.info(this, s"Gracefully shutting down the scheduler") |
| // TODO: TBD, after containerManager and queueManager are ready, can implement it |
| } |
| |
| private def getUserLimit(invocationNamespace: String): Future[Int] = { |
| Identity |
| .get(authStore, EntityName(invocationNamespace))(trasnid) |
| .map { identity => |
| val limit = identity.limits.concurrentInvocations.getOrElse(config.actionInvokeConcurrentLimit.toInt) |
| logging.debug(this, s"limit for ${invocationNamespace}: ${limit}")(trasnid) |
| limit |
| } |
| .andThen { |
| case Failure(_: NoDocumentException) => |
| logging.warn(this, s"namespace does not exist: $invocationNamespace")(trasnid) |
| case Failure(_: IllegalStateException) => |
| logging.warn(this, s"namespace is not unique: $invocationNamespace")(trasnid) |
| } |
| } |
| |
| private val etcdWorkerFactory = "" // TODO: TBD |
| |
| /** |
| * This component is in charge of storing data to ETCD. |
| * Even if any error happens we can assume the data will be eventually available in the ETCD by this component. |
| */ |
| val dataManagementService = "" // TODO: TBD |
| |
| val creationJobManagerFactory = "" // TODO: TBD |
| |
| /** |
| * This component is responsible for creating containers for a given action. |
| * It relies on the creationJobManager to manage the container creation job. |
| */ |
| val containerManager = "" // TODO: TBD |
| |
| /** |
| * This is a factory to create memory queues. |
| * In the new architecture, each action is given its own dedicated queue. |
| */ |
| val memoryQueueFactory = "" // TODO: TBD |
| |
| val schedulerConsumer = msgProvider.getConsumer( |
| config, |
| s"scheduler${schedulerId.asString}", |
| s"scheduler${schedulerId.asString}", |
| 500, // TODO: to be updated with maxPeek variable |
| maxPollInterval = TimeLimit.MAX_DURATION + 1.minute) |
| |
| implicit val trasnid = TransactionId.containerCreation |
| |
| /** |
| * This is one of the major components which take charge of managing queues and coordinating requests among the scheduler, controllers, and invokers. |
| */ |
| val queueManager = "" // TODO: TBD |
| |
| //val serviceHandlers: HttpRequest => Future[HttpResponse] = ActivationServiceHandler.apply(ActivationServiceImpl()) TODO: TBD |
| } |
| |
| case class CmdLineArgs(uniqueName: Option[String] = None, id: Option[Int] = None, displayedName: Option[String] = None) |
| |
| trait SchedulerCore { |
| def getState: Future[(List[(SchedulerInstanceId, Int)], Int)] |
| |
| def getQueueSize: Future[Int] |
| |
| def getQueueStatusData: Future[List[StatusData]] |
| |
| def disable(): Unit |
| } |
| |
| object Scheduler { |
| |
| protected val protocol = loadConfigOrThrow[String]("whisk.scheduler.protocol") |
| |
| /** |
| * The scheduler has two ports, one for akka-remote and the other for akka-grpc. |
| */ |
| def requiredProperties = |
| Map( |
| servicePort -> 8080.toString, |
| schedulerHost -> null, |
| schedulerAkkaPort -> null, |
| schedulerRpcPort -> null, |
| WhiskConfig.actionInvokePerMinuteLimit -> null, |
| WhiskConfig.actionInvokeConcurrentLimit -> null, |
| WhiskConfig.triggerFirePerMinuteLimit -> null) ++ |
| kafkaHosts ++ |
| zookeeperHosts ++ |
| wskApiHost ++ |
| ExecManifest.requiredProperties |
| |
| def initKamon(instance: SchedulerInstanceId): Unit = { |
| // Replace the hostname of the scheduler to the assigned id of the scheduler. |
| val newKamonConfig = Kamon.config |
| .withValue("kamon.environment.host", ConfigValueFactory.fromAnyRef(s"scheduler${instance.asString}")) |
| Kamon.init(newKamonConfig) |
| } |
| |
| def main(args: Array[String]): Unit = { |
| implicit val ec = ExecutionContextFactory.makeCachedThreadPoolExecutionContext() |
| implicit val actorSystem: ActorSystem = |
| ActorSystem(name = "scheduler-actor-system", defaultExecutionContext = Some(ec)) |
| implicit val materializer = ActorMaterializer.create(actorSystem) |
| |
| implicit val logger = new AkkaLogging(akka.event.Logging.getLogger(actorSystem, this)) |
| |
| // Prepare Kamon shutdown |
| CoordinatedShutdown(actorSystem).addTask(CoordinatedShutdown.PhaseActorSystemTerminate, "shutdownKamon") { () => |
| logger.info(this, s"Shutting down Kamon with coordinated shutdown") |
| Kamon.stopModules().map(_ => Done) |
| } |
| |
| def abort(message: String) = { |
| logger.error(this, message) |
| actorSystem.terminate() |
| Await.result(actorSystem.whenTerminated, 30.seconds) |
| sys.exit(1) |
| } |
| |
| // extract configuration data from the environment |
| implicit val config = new WhiskConfig(requiredProperties) |
| if (!config.isValid) { |
| abort("Bad configuration, cannot start.") |
| } |
| |
| val port = config.servicePort.toInt |
| val host = config.schedulerHost |
| val rpcPort = config.schedulerRpcPort.toInt |
| val akkaPort = config.schedulerAkkaPort.toInt |
| |
| // if deploying multiple instances (scale out), must pass the instance number as they need to be uniquely identified. |
| require(args.length >= 1, "scheduler instance required") |
| val instanceId = SchedulerInstanceId(args(0)) |
| |
| initKamon(instanceId) |
| |
| val msgProvider = SpiLoader.get[MessagingProvider] |
| |
| Seq( |
| ("scheduler" + instanceId.asString, "actions", Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)), |
| ("creationAck" + instanceId.asString, "creationAck", Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))) |
| .foreach { |
| case (topic, topicConfigurationKey, maxMessageBytes) => |
| if (msgProvider.ensureTopic(config, topic, topicConfigurationKey, maxMessageBytes).isFailure) { |
| abort(s"failure during msgProvider.ensureTopic for topic $topic") |
| } |
| } |
| |
| ExecManifest.initialize(config) match { |
| case Success(_) => |
| val schedulerEndpoints = SchedulerEndpoints(host, rpcPort, akkaPort) |
| // Create scheduler |
| val scheduler = new Scheduler(instanceId, schedulerEndpoints) |
| |
| // TODO: Add Akka-grpc handler |
| val httpsConfig = |
| if (Scheduler.protocol == "https") Some(loadConfigOrThrow[HttpsConfig]("whisk.controller.https")) else None |
| |
| BasicHttpService.startHttpService(FPCSchedulerServer.instance(scheduler).route, port, httpsConfig)( |
| actorSystem, |
| ActorMaterializer.create(actorSystem)) |
| |
| case Failure(t) => |
| abort(s"Invalid runtimes manifest: $t") |
| } |
| } |
| } |
| case class SchedulerEndpoints(host: String, rpcPort: Int, akkaPort: Int) { |
| require(rpcPort != 0 || akkaPort != 0) |
| def asRpcEndpoint: String = s"$host:$rpcPort" |
| def asAkkaEndpoint: String = s"$host:$akkaPort" |
| |
| def getRemoteRef(name: String)(implicit context: ActorRefFactory): ActorSelection = { |
| implicit val ec = context.dispatcher |
| |
| val path = s"akka.tcp://scheduler-actor-system@${asAkkaEndpoint}/user/${name}" |
| context.actorSelection(path) |
| } |
| |
| def serialize = SchedulerEndpoints.serdes.write(this).compactPrint |
| } |
| |
| object SchedulerEndpoints extends DefaultJsonProtocol { |
| implicit val serdes = jsonFormat(SchedulerEndpoints.apply, "host", "rpcPort", "akkaPort") |
| def parse(endpoints: String) = Try(serdes.read(endpoints.parseJson)) |
| } |
| |
| case class SchedulerStates(sid: SchedulerInstanceId, queueSize: Int, endpoints: SchedulerEndpoints) { |
| private implicit val askTimeout = Timeout(5 seconds) |
| |
| def getRemoteRef(name: String)(implicit context: ActorRefFactory): ActorSelection = { |
| implicit val ec = context.dispatcher |
| |
| val path = s"akka.tcp://scheduler-actor-system@${endpoints.asAkkaEndpoint}/user/${name}" |
| context.actorSelection(path) |
| } |
| |
| def getSchedulerId(): SchedulerInstanceId = sid |
| |
| def serialize = SchedulerStates.serdes.write(this).compactPrint |
| } |
| |
| object SchedulerStates extends DefaultJsonProtocol { |
| private implicit val endpointsSerde = SchedulerEndpoints.serdes |
| implicit val serdes = jsonFormat(SchedulerStates.apply, "sid", "queueSize", "endpoints") |
| |
| def parse(states: String) = Try(serdes.read(states.parseJson)) |
| } |