package org.apache.toree.kernel.protocol.v5.handler
import java.util.UUID
import{Props, ActorRef, ActorSystem}
import akka.testkit.{TestProbe, ImplicitSender, TestKit}
import org.apache.toree.kernel.protocol.v5
import org.apache.toree.kernel.protocol.v5.content.{ClearOutput, CommClose}
import org.apache.toree.kernel.protocol.v5.kernel.ActorLoader
import org.apache.toree.kernel.protocol.v5.{KernelMessage, SystemActorType, KMBuilder}
import org.apache.toree.comm.{CommRegistrar, CommWriter, CommCallbacks, CommStorage}
import org.scalatestplus.mockito.MockitoSugar
import org.scalatest.funspec.AnyFunSpecLike
import org.scalatest.matchers.should.Matchers
import org.scalatest.BeforeAndAfterEach
import org.mockito.Mockito._
import org.mockito.ArgumentMatchers._
import test.utils.MaxAkkaTestTimeout
class CommCloseHandlerSpec extends TestKit(
) with ImplicitSender with AnyFunSpecLike with Matchers with MockitoSugar
with BeforeAndAfterEach
private val TestCommId = UUID.randomUUID().toString
private var kmBuilder: KMBuilder = _
private var spyCommStorage: CommStorage = _
private var mockCommCallbacks: CommCallbacks = _
private var mockCommRegistrar: CommRegistrar = _
private var mockActorLoader: ActorLoader = _
private var commCloseHandler: ActorRef = _
private var kernelMessageRelayProbe: TestProbe = _
private var statusDispatchProbe: TestProbe = _
override def beforeEach(): Unit = {
kmBuilder = KMBuilder()
mockCommCallbacks = mock[CommCallbacks]
spyCommStorage = spy[CommStorage](new CommStorage())
mockCommRegistrar = mock[CommRegistrar]
mockActorLoader = mock[ActorLoader]
commCloseHandler = system.actorOf(Props(
mockActorLoader, mockCommRegistrar, spyCommStorage
// Used to intercept responses
kernelMessageRelayProbe = TestProbe()
// Used to intercept busy/idle messages
statusDispatchProbe = new TestProbe(system)
describe("CommCloseHandler") {
describe("#process") {
it("should execute close callbacks if the id is registered") {
// Mark our id as registered
doReturn(Some(mockCommCallbacks), Nil: _*).when(spyCommStorage)
// Send a comm_open message with the test target
commCloseHandler ! kmBuilder
.withContentString(CommClose(TestCommId, v5.MsgData.Empty))
// Should receive a busy and an idle message
statusDispatchProbe.receiveN(2, MaxAkkaTestTimeout)
// Verify that the msg callbacks were triggered along the way
any[CommWriter], any[v5.UUID], any[v5.MsgData])
it("should not execute close callbacks if the id is not registered") {
// Mark our target as not registered
doReturn(None, Nil: _*).when(spyCommStorage).getCommIdCallbacks(TestCommId)
// Send a comm_msg message with the test id
commCloseHandler ! kmBuilder
.withContentString(CommClose(TestCommId, v5.MsgData.Empty))
// Should receive a busy and an idle message
statusDispatchProbe.receiveN(2, MaxAkkaTestTimeout)
// Verify that the msg callbacks were NOT triggered along the way
verify(mockCommCallbacks, never()).executeCloseCallbacks(
any[CommWriter], any[v5.UUID], any[v5.MsgData])
it("should do nothing if there is a parsing error") {
// Send a comm_open message with an invalid content string
commCloseHandler ! kmBuilder
.withContentString(ClearOutput(_wait = true))
// TODO: Is there a better way to test for this without an upper time
// limit? Is there a different logical approach?
it("should include the parent's header in the parent header of " +
"outgoing messages"){
// Register a callback that sends a message using the comm writer
val closeCallback: CommCallbacks.CloseCallback =
new CommCallbacks.CloseCallback() {
def apply(v1: CommWriter, v2: v5.UUID, v4: v5.MsgData) =
val callbacks = (new CommCallbacks).addCloseCallback(closeCallback)
doReturn(Some(callbacks), Nil: _*).when(spyCommStorage)
// Send a comm close message
val msg = kmBuilder
.withContentString(CommClose(TestCommId, v5.MsgData.Empty))
commCloseHandler ! msg
// Verify that the message sent by the handler has the desired property
kernelMessageRelayProbe.fishForMessage(MaxAkkaTestTimeout) {
case KernelMessage(_, _, _, parentHeader, _, _) =>
parentHeader == msg.header