* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License
package org.apache.toree.kernel.protocol.v5.relay
import org.apache.pekko.testkit.{ImplicitSender, TestKit, TestProbe}
import org.apache.toree.communication.ZMQMessage
import org.apache.toree.kernel.protocol.v5._
import org.apache.toree.kernel.protocol.v5.kernel.{ActorLoader, Utilities}
import Utilities._
import org.mockito.Mockito._
import org.scalatest.concurrent.{PatienceConfiguration, ScalaFutures}
import org.scalatestplus.mockito.MockitoSugar
import org.scalatest.funspec.AnyFunSpecLike
import org.scalatest.matchers.should.Matchers
import org.scalatest.BeforeAndAfterEach
import org.mockito.ArgumentMatchers.{eq => mockEq}
import org.mockito.AdditionalMatchers.{not => mockNot}
import org.apache.toree.kernel.protocol.v5.KernelMessage
import scala.concurrent._
import org.apache.pekko.pattern.pipe
import scala.util.Random
import test.utils._
class KernelMessageRelaySpec extends TestKit(
with ImplicitSender with AnyFunSpecLike with Matchers with MockitoSugar
with BeforeAndAfterEach with ScalaFutures {
private val IncomingMessageType = MessageType.Incoming.CompleteRequest.toString
private val OutgoingMessageType = MessageType.Outgoing.CompleteReply.toString
private val header: Header = Header("<UUID>", "<USER>", "<SESSION>",
"<TYPE>", "<VERSION>")
private val parentHeader: Header = Header("<PARENT-UUID>", "<PARENT-USER>",
private val incomingKernelMessage: KernelMessage = KernelMessage(Seq("<ID>".getBytes),
"<SIGNATURE>", header.copy(msg_type = IncomingMessageType),
parentHeader, Metadata(), "<CONTENT>")
private val outgoingKernelMessage: KernelMessage = KernelMessage(Seq("<ID>".getBytes),
"<SIGNATURE>", header.copy(msg_type = OutgoingMessageType),
incomingKernelMessage.header, Metadata(), "<CONTENT>")
private val incomingZmqStrings = "1" :: "2" :: "3" :: "4" :: Nil
private var actorLoader: ActorLoader = _
private var signatureProbe: TestProbe = _
private var signatureSelection: ActorSelection = _
private var captureProbe: TestProbe = _
private var captureSelection: ActorSelection = _
private var relayWithoutSignatureManager: ActorRef = _
private var relayWithSignatureManager: ActorRef = _
override def beforeEach(): Unit = {
// Create a mock ActorLoader for the Relay we are going to test
actorLoader = mock[ActorLoader]
// Create a probe for the signature manager and mock the ActorLoader to
// return the associated ActorSelection
signatureProbe = TestProbe()
signatureSelection = system.actorSelection(signatureProbe.ref.path.toString)
// Create a probe to capture output from the relay for testing
captureProbe = TestProbe()
captureSelection = system.actorSelection(captureProbe.ref.path.toString)
relayWithoutSignatureManager = system.actorOf(Props(
classOf[KernelMessageRelay], actorLoader, false,
mock[Map[String, String]], mock[Map[String, String]]
relayWithSignatureManager = system.actorOf(Props(
classOf[KernelMessageRelay], actorLoader, true,
mock[Map[String, String]], mock[Map[String, String]]
describe("Relay") {
describe("#receive") {
describe("when not using the signature manager") {
it("should not send anything to SignatureManager for incoming") {
relayWithoutSignatureManager ! true // Mark as ready for incoming
relayWithoutSignatureManager ! incomingKernelMessage
it("should not send anything to SignatureManager for outgoing") {
relayWithoutSignatureManager ! outgoingKernelMessage
it("should relay KernelMessage for incoming") {
relayWithoutSignatureManager ! true // Mark as ready for incoming
relayWithoutSignatureManager !
((incomingZmqStrings, incomingKernelMessage))
captureProbe.expectMsg(MaxAkkaTestTimeout, incomingKernelMessage)
it("should relay KernelMessage for outgoing") {
relayWithoutSignatureManager ! outgoingKernelMessage
captureProbe.expectMsg(MaxAkkaTestTimeout, outgoingKernelMessage)
describe("when using the signature manager") {
it("should verify the signature if the message is incoming") {
relayWithSignatureManager ! true // Mark as ready for incoming
relayWithSignatureManager ! incomingKernelMessage
signatureProbe.expectMsg(MaxAkkaTestTimeout, incomingKernelMessage)
it("should construct the signature if the message is outgoing") {
relayWithSignatureManager ! outgoingKernelMessage
signatureProbe.expectMsg(MaxAkkaTestTimeout, outgoingKernelMessage)
describe("when not ready") {
it("should not relay the message if it is incoming") {
val incomingMessage: ZMQMessage = incomingKernelMessage
relayWithoutSignatureManager ! incomingMessage
it("should relay the message if it is outgoing") {
relayWithoutSignatureManager ! outgoingKernelMessage
captureProbe.expectMsg(MaxAkkaTestTimeout, outgoingKernelMessage)
describe("when ready") {
it("should relay the message if it is incoming") {
relayWithoutSignatureManager ! true // Mark as ready for incoming
relayWithoutSignatureManager !
((incomingZmqStrings, incomingKernelMessage))
captureProbe.expectMsg(MaxAkkaTestTimeout, incomingKernelMessage)
it("should relay the message if it is outgoing") {
relayWithoutSignatureManager ! true // Mark as ready for incoming
relayWithoutSignatureManager ! outgoingKernelMessage
captureProbe.expectMsg(MaxAkkaTestTimeout, outgoingKernelMessage)
describe("multiple messages in order"){
it("should relay messages in the order they were received") {
// Setup the base actor system and the relay
val actorLoader = mock[ActorLoader]
val kernelMessageRelay = system.actorOf(Props(
classOf[KernelMessageRelay], actorLoader, true,
mock[Map[String, String]], mock[Map[String, String]]
// Where all of the messages are relayed to, otherwise NPE
val captureProbe = TestProbe()
val captureSelection = system.actorSelection(captureProbe.ref.path.toString)
val n = 5
val chaoticPromise: Promise[String] = Promise()
var actual : List[String] = List()
val expected = (0 until n).map(_.toString).toList
// setup a ChaoticActor to accumulate message values
// A promise succeeds after n messages have been accumulated
val chaoticActor: ActorRef = system.actorOf(Props(
(paramVal: Any) => {
val tuple = paramVal.asInstanceOf[(String, Seq[_])]
actual = actual :+ tuple._1
if (actual.length == n) chaoticPromise.success("Done")
kernelMessageRelay ! true
// Sends messages with contents = to values of increasing counter
sendKernelMessages(n, kernelMessageRelay)
// Message values should be accumulated in the proper order
PatienceConfiguration.Interval(MaxAkkaTestInterval)) {
case _: String =>
actual should be(expected)
def sendKernelMessages(n: Int, kernelMessageRelay: ActorRef): Unit ={
// Sends n messages to the relay
(0 until n).foreach (i => {
val km = KernelMessage(Seq("<ID>".getBytes), s"${i}",
header.copy(msg_type = IncomingMessageType), parentHeader,
Metadata(), s"${i}")
kernelMessageRelay ! Tuple2(Seq("SomeString"), km)
case class ChaoticActor[U](receiveFunc : Any => U) extends Actor {
override def receive: Receive = {
case fVal: Any =>
// The test actor system runs the actors on a single thread, so we must
// simulate asynchronous behaviour by staring a new thread
val promise = Promise[U]()
promise.future pipeTo sender
new Thread(new Runnable {
override def run(): Unit = {
Thread.sleep(Random.nextInt(30) * 10)