blob: 7c6dae77e068025220b4a920e624a2c5f7354f9d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License
*/
package org.apache.toree.kernel.protocol.v5.handler
import java.io.OutputStream
import java.util.concurrent.atomic.AtomicInteger
import org.apache.pekko.actor._
import org.apache.pekko.testkit.{ImplicitSender, TestKit, TestProbe}
import org.apache.toree.kernel.api.{FactoryMethods, Kernel}
import org.apache.toree.kernel.protocol.v5._
import org.apache.toree.kernel.protocol.v5.content._
import org.apache.toree.kernel.protocol.v5.kernel.ActorLoader
import org.apache.toree.kernel.protocol.v5Test._
import org.scalatestplus.mockito.MockitoSugar
import org.scalatest.funspec.AnyFunSpecLike
import org.scalatest.matchers.should.Matchers
import org.scalatest.BeforeAndAfterEach
import play.api.libs.json.Json
import org.mockito.Mockito._
import org.mockito.ArgumentMatchers._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent._
import test.utils.MaxAkkaTestTimeout
class ExecuteRequestHandlerSpec extends TestKit(
ActorSystem(
"ExecuteRequestHandlerSpec",
None,
Some(org.apache.toree.Main.getClass.getClassLoader)
)
) with ImplicitSender with AnyFunSpecLike with Matchers with MockitoSugar
with BeforeAndAfterEach {
private var mockActorLoader: ActorLoader = _
private var mockFactoryMethods: FactoryMethods = _
private var mockKernel: Kernel = _
private var mockOutputStream: OutputStream = _
private var handlerActor: ActorRef = _
private var kernelMessageRelayProbe: TestProbe = _
private var executeRequestRelayProbe: TestProbe = _
private var statusDispatchProbe: TestProbe = _
override def beforeEach(): Unit = {
mockActorLoader = mock[ActorLoader]
mockFactoryMethods = mock[FactoryMethods]
mockKernel = mock[Kernel]
mockOutputStream = mock[OutputStream]
doReturn(mockFactoryMethods, Nil: _*).when(mockKernel)
.factory(any[KernelMessage], any[KMBuilder])
doReturn(mockOutputStream, Nil: _*).when(mockFactoryMethods)
.newKernelOutputStream(anyString(), anyBoolean())
// Add our handler and mock interpreter to the actor system
handlerActor = system.actorOf(Props(
classOf[ExecuteRequestHandler],
mockActorLoader,
mockKernel
))
kernelMessageRelayProbe = new TestProbe(system)
when(mockActorLoader.load(SystemActorType.KernelMessageRelay))
.thenReturn(system.actorSelection(kernelMessageRelayProbe.ref.path.toString))
executeRequestRelayProbe = new TestProbe(system)
when(mockActorLoader.load(SystemActorType.ExecuteRequestRelay))
.thenReturn(system.actorSelection(executeRequestRelayProbe.ref.path.toString))
statusDispatchProbe = new TestProbe(system)
when(mockActorLoader.load(SystemActorType.StatusDispatch))
.thenReturn(system.actorSelection(statusDispatchProbe.ref.path.toString))
}
/**
* This method simulates the interpreter passing back an
* execute result and reply.
*/
def replyToHandlerWithOkAndResult() = {
// This stubs the behaviour of the interpreter executing code
val expectedClass = classOf[(ExecuteRequest, KernelMessage, OutputStream)]
executeRequestRelayProbe.expectMsgClass(expectedClass)
executeRequestRelayProbe.reply((
ExecuteReplyOk(1, Some(Payloads()), Some(UserExpressions())),
ExecuteResult(1, Data("text/plain" -> "resulty result"), Metadata())
))
}
def replyToHandlerWithOk() = {
// This stubs the behaviour of the interpreter executing code
val expectedClass = classOf[(ExecuteRequest, KernelMessage, OutputStream)]
executeRequestRelayProbe.expectMsgClass(expectedClass)
executeRequestRelayProbe.reply((
ExecuteReplyOk(1, Some(Payloads()), Some(UserExpressions())),
ExecuteResult(1, Data("text/plain" -> ""), Metadata())
))
}
/**
* This method simulates the interpreter passing back an
* execute result and reply
*/
def replyToHandlerWithErrorAndResult() = {
// This stubs the behaviour of the interpreter executing code
val expectedClass = classOf[(ExecuteRequest, KernelMessage, OutputStream)]
executeRequestRelayProbe.expectMsgClass(expectedClass)
executeRequestRelayProbe.reply((
ExecuteReplyError(1, Some(""), Some(""), Some(Nil)),
ExecuteResult(1, Data("text/plain" -> "resulty result"), Metadata())
))
}
describe("ExecuteRequestHandler( ActorLoader )") {
describe("#receive( KernelMessage ) when interpreter replies") {
it("should send an execute result message if the result is not empty") {
handlerActor ! MockExecuteRequestKernelMessage
replyToHandlerWithOkAndResult()
kernelMessageRelayProbe.fishForMessage(MaxAkkaTestTimeout) {
case KernelMessage(_, _, header, _, _, _) =>
header.msg_type == ExecuteResult.toTypeString
}
}
it("should not send an execute result message if there is no result") {
handlerActor ! MockExecuteRequestKernelMessage
replyToHandlerWithOk()
kernelMessageRelayProbe.fishForMessage(MaxAkkaTestTimeout) {
case KernelMessage(_, _, header, _, _, _) =>
header.msg_type != ExecuteResult.toTypeString
}
}
it("should send an execute reply message") {
handlerActor ! MockExecuteRequestKernelMessage
replyToHandlerWithOkAndResult()
kernelMessageRelayProbe.fishForMessage(MaxAkkaTestTimeout) {
case KernelMessage(_, _, header, _, _, _) =>
header.msg_type == ExecuteResult.toTypeString
}
}
it("should send a status idle message after the reply and result") {
handlerActor ! MockExecuteRequestKernelMessage
replyToHandlerWithOkAndResult()
val msgCount = new AtomicInteger(0)
var statusMsgNum = -1
var statusReceived = false
val f1 = Future {
kernelMessageRelayProbe.fishForMessage(MaxAkkaTestTimeout) {
case KernelMessage(_, _, header, _, _, _) =>
if (header.msg_type == ExecuteResult.toTypeString &&
!statusReceived)
msgCount.incrementAndGet()
else if (header.msg_type == ExecuteReply.toTypeString &&
!statusReceived)
msgCount.incrementAndGet()
statusReceived || (msgCount.get() >= 2)
}
}
val f2 = Future {
statusDispatchProbe.fishForMessage(MaxAkkaTestTimeout) {
case (status, header) =>
if (status == KernelStatusIdle.toString)
statusReceived = true
statusMsgNum = msgCount.get()
statusReceived || (msgCount.get() >= 2)
}
}
val fs = f1.zip(f2)
Await.ready(fs, 3 * MaxAkkaTestTimeout)
statusMsgNum should equal(2)
}
it("should send an execute input message") {
handlerActor ! MockExecuteRequestKernelMessage
kernelMessageRelayProbe.fishForMessage(MaxAkkaTestTimeout) {
case KernelMessage(_, _, header, _, _, _) =>
header.msg_type == ExecuteInput.toTypeString
}
}
it("should send a message with ids equal to the incoming " +
"KernelMessage's ids") {
handlerActor ! MockExecuteRequestKernelMessage
kernelMessageRelayProbe.fishForMessage(MaxAkkaTestTimeout) {
case KernelMessage(ids, _, _, _, _, _) =>
ids == MockExecuteRequestKernelMessage.ids
}
}
it("should send a message with parent header equal to the incoming " +
"KernelMessage's header") {
handlerActor ! MockExecuteRequestKernelMessage
kernelMessageRelayProbe.fishForMessage(MaxAkkaTestTimeout) {
case KernelMessage(_, _, _, parentHeader, _, _) =>
parentHeader == MockExecuteRequestKernelMessage.header
}
}
// TODO: Investigate if this is still relevant at all
// it("should send a status busy and idle message") {
// handlerActor ! MockExecuteRequestKernelMessage
// replyToHandlerWithOkAndResult()
// var busy = false
// var idle = false
//
// statusDispatchProbe.receiveWhile(100.milliseconds) {
// case Tuple2(status: KernelStatusType, header: Header)=>
// if(status == KernelStatusType.Busy)
// busy = true
// if(status == KernelStatusType.Idle)
// idle = true
// }
//
// idle should be (true)
// busy should be (true)
// }
}
}
// Testing error timeout for interpreter future
describe("ExecuteRequestHandler( ActorLoader )") {
describe("#receive( KernelMessage with bad JSON content )"){
it("should respond with an execute_reply with status error") {
handlerActor ! MockKernelMessageWithBadExecuteRequest
kernelMessageRelayProbe.fishForMessage(MaxAkkaTestTimeout) {
// Only mark as successful if this specific message was received
case KernelMessage(_, _, header, _, _, contentString)
if header.msg_type == ExecuteReply.toTypeString =>
val reply = Json.parse(contentString).as[ExecuteReply]
reply.status == "error"
case _ => false
}
}
it("should send error message to relay") {
handlerActor ! MockKernelMessageWithBadExecuteRequest
kernelMessageRelayProbe.fishForMessage(MaxAkkaTestTimeout) {
// Only mark as successful if this specific message was received
case KernelMessage(_, _, header, _, _, _)
if header.msg_type == ErrorContent.toTypeString => true
case _ => false
}
}
// TODO: Investigate if this is still relevant at all
// it("should send a status idle message") {
// handlerActor ! MockKernelMessageWithBadExecuteRequest
// var busy = false
// var idle = false
//
// statusDispatchProbe.receiveWhile(100.milliseconds) {
// case Tuple2(status: KernelStatusType, header: Header)=>
// if(status == KernelStatusType.Busy)
// busy = true
// if(status == KernelStatusType.Idle)
// idle = true
// }
//
// idle should be (true)
// busy should be (false)
// }
}
}
}