blob: d6b969a296002117025ea53c27347904611af1eb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License
*/
package org.apache.toree.boot.layer
import org.apache.pekko.actor.{ActorRef, Props, ActorSystem}
import org.apache.toree.kernel.protocol.v5.dispatch.StatusDispatch
import org.apache.toree.kernel.protocol.v5.handler.{GenericSocketMessageHandler, ShutdownHandler}
import org.apache.toree.kernel.protocol.v5.kernel.{SimpleActorLoader, ActorLoader}
import org.apache.toree.communication.security.{SecurityActorType, SignatureManagerActor}
import org.apache.toree.kernel.protocol.v5.kernel.socket._
import org.apache.toree.kernel.protocol.v5._
import org.apache.toree.kernel.protocol.v5.content.{CommClose, CommMsg, CommOpen}
import org.apache.toree.kernel.protocol.v5.relay.KernelMessageRelay
import org.apache.toree.utils.LogLike
import com.typesafe.config.Config
import play.api.libs.json.Json
/**
* Represents the raw initialization needed to send a "starting" message.
*/
trait BareInitialization {
/**
* Initializes and registers all objects needed to get the kernel to send a
* "starting" message.
*
* @param config The config used for initialization
* @param actorSystemName The name to use for the actor system
*/
def initializeBare(config: Config, actorSystemName: String):
(ActorSystem, ActorLoader, ActorRef, ActorRef)
}
/**
* Represents the standard implementation of BareInitialization.
*/
trait StandardBareInitialization extends BareInitialization { this: LogLike =>
/**
* Initializes and registers all objects needed to get the kernel to send a
* "starting" message.
*
* @param config The config used for initialization
* @param actorSystemName The name to use for the actor system
*/
def initializeBare(config: Config, actorSystemName: String) = {
val actorSystem = createActorSystem(actorSystemName)
val actorLoader = createActorLoader(actorSystem)
val (kernelMessageRelayActor, _, statusDispatch, _, _) =
initializeCoreActors(config, actorSystem, actorLoader)
createSockets(config, actorSystem, actorLoader)
(actorSystem, actorLoader, kernelMessageRelayActor, statusDispatch)
}
protected def createActorSystem(actorSystemName: String): ActorSystem = {
logger.info("Initializing internal actor system")
ActorSystem(actorSystemName)
}
protected def createActorLoader(actorSystem: ActorSystem): ActorLoader = {
logger.debug("Creating Simple Actor Loader")
SimpleActorLoader(actorSystem)
}
/**
* Does minimal setup in order to send the "starting" status message over
* the IOPub socket
*/
protected def initializeCoreActors(
config: Config, actorSystem: ActorSystem, actorLoader: ActorLoader
) = {
logger.debug("Creating kernel message relay actor")
val kernelMessageRelayActor = actorSystem.actorOf(
Props(
classOf[KernelMessageRelay], actorLoader, true,
Map(
CommOpen.toTypeString -> MessageType.Incoming.CommOpen.toString,
CommMsg.toTypeString -> MessageType.Incoming.CommMsg.toString,
CommClose.toTypeString -> MessageType.Incoming.CommClose.toString
),
Map(
CommOpen.toTypeString -> MessageType.Outgoing.CommOpen.toString,
CommMsg.toTypeString -> MessageType.Outgoing.CommMsg.toString,
CommClose.toTypeString -> MessageType.Outgoing.CommClose.toString
)
),
name = SystemActorType.KernelMessageRelay.toString
)
logger.debug("Creating signature manager actor")
val sigKey = config.getString("key")
val sigScheme = config.getString("signature_scheme")
logger.debug("Key = " + sigKey)
logger.debug("Scheme = " + sigScheme)
val signatureManagerActor = actorSystem.actorOf(
Props(
classOf[SignatureManagerActor], sigKey, sigScheme.replace("-", "")
),
name = SecurityActorType.SignatureManager.toString
)
logger.debug("Creating status dispatch actor")
val statusDispatch = actorSystem.actorOf(
Props(classOf[StatusDispatch], actorLoader),
name = SystemActorType.StatusDispatch.toString
)
logger.debug("Creating shutdown handler and sender actors")
val shutdownHandler = actorSystem.actorOf(
Props(classOf[ShutdownHandler], actorLoader),
name = MessageType.Incoming.ShutdownRequest.toString
)
val shutdownSender = actorSystem.actorOf(
Props(classOf[GenericSocketMessageHandler], actorLoader, SocketType.Control),
name = MessageType.Outgoing.ShutdownReply.toString
)
(kernelMessageRelayActor, signatureManagerActor, statusDispatch, shutdownHandler, shutdownSender)
}
protected def createSockets(
config: Config, actorSystem: ActorSystem, actorLoader: ActorLoader
): Unit = {
logger.debug("Creating sockets")
val socketConfig: SocketConfig = SocketConfig.fromConfig(config)
logger.info("Connection Profile: "
+ Json.prettyPrint(Json.toJson(socketConfig)))
logger.debug("Constructing ServerSocketFactory")
val socketFactory = new SocketFactory(socketConfig)
logger.debug("Initializing Heartbeat on port " +
socketConfig.hb_port)
val heartbeatActor = actorSystem.actorOf(
Props(classOf[Heartbeat], socketFactory),
name = SocketType.Heartbeat.toString
)
logger.debug("Initializing Stdin on port " +
socketConfig.stdin_port)
val stdinActor = actorSystem.actorOf(
Props(classOf[Stdin], socketFactory, actorLoader),
name = SocketType.StdIn.toString
)
logger.debug("Initializing Shell on port " +
socketConfig.shell_port)
val shellActor = actorSystem.actorOf(
Props(classOf[Shell], socketFactory, actorLoader),
name = SocketType.Shell.toString
)
logger.debug("Initializing Control on port " +
socketConfig.control_port)
val controlActor = actorSystem.actorOf(
Props(classOf[Control], socketFactory, actorLoader),
name = SocketType.Control.toString
)
logger.debug("Initializing IOPub on port " +
socketConfig.iopub_port)
val ioPubActor = actorSystem.actorOf(
Props(classOf[IOPub], socketFactory),
name = SocketType.IOPub.toString
)
}
}