blob: c801c0e14d069c6acb23b84568b2b581b294ae44 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License
*/
package org.apache.toree.kernel.protocol.v5.stream
import java.util.UUID
import akka.actor.{ActorSelection, ActorSystem}
import akka.testkit.{TestKit, TestProbe}
import org.apache.toree.Main
import org.apache.toree.kernel.protocol.v5._
import org.apache.toree.kernel.protocol.v5.kernel.ActorLoader
import org.apache.toree.utils.ScheduledTaskManager
import org.mockito.Mockito._
import org.scalatest.mock.MockitoSugar
import org.scalatest._
import play.api.libs.json._
import org.apache.toree.kernel.protocol.v5.content.StreamContent
import test.utils.MaxAkkaTestTimeout
class KernelOuputStreamSpec
extends TestKit(ActorSystem("KernelOutputStreamActorSystem", None, Some(Main.getClass.getClassLoader)))
with FunSpecLike with Matchers with GivenWhenThen with BeforeAndAfter
with MockitoSugar
{
private var mockActorLoader: ActorLoader = _
private var mockScheduledTaskManager: MockScheduledTaskManager = _
private var kernelOutputRelayProbe: TestProbe = _
//
// SHARED ELEMENTS BETWEEN TESTS
//
private val GeneratedTaskId = UUID.randomUUID().toString
private val skeletonBuilder = KMBuilder()
.withIds(Nil).withSignature("").withContentString("")
.withParentHeader(Header("", "", "", "", "5.0"))
/**
* This stubs out the methods of the scheduled task manager and provides a
* form of verification, which is not (easily) doable with Mockito due to the
* call-by-name argument in addTask.
*/
private class MockScheduledTaskManager extends ScheduledTaskManager {
private var addTaskCalled = false
private var removeTaskCalled = false
private var stopCalled = false
def verifyAddTaskCalled(): Unit = addTaskCalled should be (true)
def verifyRemoveTaskCalled(): Unit = removeTaskCalled should be (true)
def verifyStopCalled(): Unit = stopCalled should be (true)
def verifyAddTaskNotCalled(): Unit = addTaskCalled should be (false)
def verifyRemoveTaskNotCalled(): Unit = removeTaskCalled should be (false)
def verifyStopNotCalled(): Unit = stopCalled should be (false)
def resetVerify(): Unit = {
addTaskCalled = false
removeTaskCalled = false
stopCalled = false
}
override def addTask[T](executionDelay: Long, timeInterval: Long, task: => T): String =
{ addTaskCalled = true; GeneratedTaskId }
override def removeTask(taskId: String): Boolean =
{ removeTaskCalled = true; true }
override def stop(): Unit = stopCalled = true
def teardown(): Unit = super.stop()
}
before {
// Create a mock ActorLoader for the KernelOutputStream we are testing
mockActorLoader = mock[ActorLoader]
mockScheduledTaskManager = new MockScheduledTaskManager
// Create a probe for the relay and mock the ActorLoader to return the
// associated ActorSelection
kernelOutputRelayProbe = TestProbe()
val kernelOutputRelaySelection: ActorSelection =
system.actorSelection(kernelOutputRelayProbe.ref.path.toString)
doReturn(kernelOutputRelaySelection)
.when(mockActorLoader).load(SystemActorType.KernelMessageRelay)
}
after {
mockScheduledTaskManager.teardown()
}
describe("KernelOutputStream") {
describe("#write(Int)") {
it("should add a new byte to the internal list") {
Given("a kernel output stream with a skeleton kernel builder")
val kernelOutputStream = new KernelOutputStream(
mockActorLoader, skeletonBuilder, mockScheduledTaskManager
)
When("a byte is written to the stream")
val expected = 'a'
kernelOutputStream.write(expected)
Then("it should be appended to the internal list")
kernelOutputStream.flush()
val message = kernelOutputRelayProbe
.receiveOne(MaxAkkaTestTimeout).asInstanceOf[KernelMessage]
val executeResult = Json.parse(message.contentString).as[StreamContent]
executeResult.text should be (expected.toString)
}
it("should enable periodic flushing") {
Given("a kernel output stream with a skeleton kernel builder")
val kernelOutputStream = new KernelOutputStream(
mockActorLoader, skeletonBuilder, mockScheduledTaskManager
)
When("a byte is written to the stream")
val expected = 'a'
kernelOutputStream.write(expected)
Then("it should add a task to periodically flush")
mockScheduledTaskManager.verifyAddTaskCalled()
}
it("should not enable periodic flushing if already enabled") {
Given("a kernel output stream with a skeleton kernel builder")
val kernelOutputStream = new KernelOutputStream(
mockActorLoader, skeletonBuilder, mockScheduledTaskManager
)
And("periodic flushing is already enabled")
kernelOutputStream.write('a')
mockScheduledTaskManager.verifyAddTaskCalled()
mockScheduledTaskManager.resetVerify()
When("a byte is written to the stream")
kernelOutputStream.write('b')
Then("it should not add a task to periodically flush")
mockScheduledTaskManager.verifyAddTaskNotCalled()
}
}
describe("#flush") {
it("should disable periodic flushing") {
Given("a kernel output stream with a skeleton kernel builder")
val kernelOutputStream = new KernelOutputStream(
mockActorLoader, skeletonBuilder, mockScheduledTaskManager
)
When("a byte is written to the stream")
val expected = 'a'
kernelOutputStream.write(expected)
And("flush is invoked")
kernelOutputStream.flush()
Then("it should remove the task to periodically flush")
mockScheduledTaskManager.verifyRemoveTaskCalled()
}
it("should not disable periodic flushing if not enabled") {
Given("a kernel output stream with a skeleton kernel builder")
val kernelOutputStream = new KernelOutputStream(
mockActorLoader, skeletonBuilder, mockScheduledTaskManager
)
When("flush is invoked")
kernelOutputStream.flush()
Then("it should not remove the task to periodically flush")
mockScheduledTaskManager.verifyRemoveTaskNotCalled()
}
it("should not send empty (whitespace) messages if flag is false") {
Given("a kernel output stream with send empty output set to false")
val kernelOutputStream = new KernelOutputStream(
mockActorLoader, skeletonBuilder, mockScheduledTaskManager,
sendEmptyOutput = false
)
When("whitespace is created and flushed")
val expected = "\r \r \n \t"
kernelOutputStream.write(expected.getBytes)
kernelOutputStream.flush()
Then("no message should be sent")
kernelOutputRelayProbe.expectNoMessage(MaxAkkaTestTimeout)
}
it("should send empty (whitespace) messages if flag is true") {
Given("a kernel output stream with send empty output set to false")
val kernelOutputStream = new KernelOutputStream(
mockActorLoader, skeletonBuilder, mockScheduledTaskManager,
sendEmptyOutput = true
)
When("whitespace is created and flushed")
val expected = "\r \r \n \t"
kernelOutputStream.write(expected.getBytes)
kernelOutputStream.flush()
Then("the whitespace message should have been sent")
val message = kernelOutputRelayProbe
.receiveOne(MaxAkkaTestTimeout).asInstanceOf[KernelMessage]
val actual = Json.parse(message.contentString).as[StreamContent].text
actual should be (expected)
}
it("should set the ids of the kernel message") {
Given("a kernel output stream with a skeleton kernel builder")
val kernelOutputStream = new KernelOutputStream(
mockActorLoader, skeletonBuilder, mockScheduledTaskManager
)
When("a string is written as the result and flushed")
val expected = "some string"
kernelOutputStream.write(expected.getBytes)
kernelOutputStream.flush()
Then("the ids should be set to execute_result")
val message = kernelOutputRelayProbe
.receiveOne(MaxAkkaTestTimeout).asInstanceOf[KernelMessage]
message.ids(0).deep should equal (MessageType.Outgoing.Stream.toString.getBytes.deep)
}
it("should set the message type in the header of the kernel message to an execute_result") {
Given("a kernel output stream with a skeleton kernel builder")
val kernelOutputStream = new KernelOutputStream(
mockActorLoader, skeletonBuilder, mockScheduledTaskManager
)
When("a string is written as the result and flushed")
val expected = "some string"
kernelOutputStream.write(expected.getBytes)
kernelOutputStream.flush()
Then("the msg_type in the header should be execute_result")
val message = kernelOutputRelayProbe
.receiveOne(MaxAkkaTestTimeout).asInstanceOf[KernelMessage]
message.header.msg_type should be (MessageType.Outgoing.Stream.toString)
}
it("should set the content string of the kernel message") {
Given("a kernel output stream with a skeleton kernel builder")
val kernelOutputStream = new KernelOutputStream(
mockActorLoader, skeletonBuilder, mockScheduledTaskManager
)
When("a string is written as the result and flushed")
val expected = "some string"
kernelOutputStream.write(expected.getBytes)
kernelOutputStream.flush()
Then("the content string should have text/plain set to the string")
val message = kernelOutputRelayProbe
.receiveOne(MaxAkkaTestTimeout).asInstanceOf[KernelMessage]
val executeResult = Json.parse(message.contentString).as[StreamContent]
executeResult.text should be (expected)
}
}
}
}