blob: 84d35080ae821f5eedf7ee42d6e39586fb336f39 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License
*/
package org.apache.toree.kernel.protocol.v5.stream
import org.apache.pekko.actor.{Actor, ActorRef, ActorSystem}
import org.apache.pekko.testkit.{TestActorRef, TestKit, TestProbe}
import org.apache.toree.Main
import org.apache.toree.kernel.protocol.v5._
import org.apache.toree.kernel.protocol.v5.content.InputRequest
import org.apache.toree.kernel.protocol.v5.kernel.ActorLoader
import org.mockito.Mockito._
import org.scalatest._
import org.scalatest.funspec.AnyFunSpecLike
import org.scalatest.matchers.should.Matchers
import org.scalatestplus.mockito.MockitoSugar
import play.api.libs.json.Json
import test.utils._
class KernelInputStreamSpec
extends TestKit(ActorSystem("KernelInputStreamActorSystem", None, Some(Main.getClass.getClassLoader)))
with AnyFunSpecLike with Matchers with GivenWhenThen with BeforeAndAfterEach
with MockitoSugar
{
private var mockActorLoader: ActorLoader = _
private var mockKMBuilder: KMBuilder = _
private var kernelInputOutputHandlerProbe: TestProbe = _
private var kernelInputStream: KernelInputStream = _
private var fakeInputOutputHandlerActor: ActorRef = _
private val TestReplyString = "some reply"
override def beforeEach(): Unit = {
mockActorLoader = mock[ActorLoader]
mockKMBuilder = KMBuilder() // No need to really mock this
kernelInputStream = new KernelInputStream(mockActorLoader, mockKMBuilder)
kernelInputOutputHandlerProbe = TestProbe()
fakeInputOutputHandlerActor = TestActorRef(new Actor {
override def receive: Receive = {
// Handle case for getting an input_request
case kernelMessage: KernelMessage =>
val messageType = kernelMessage.header.msg_type
kernelInputOutputHandlerProbe.ref ! kernelMessage
if (messageType == MessageType.Outgoing.InputRequest.toString)
sender ! TestReplyString
}
})
// Add the actor that routes to our test probe and responds with a fake
// set of data
doReturn(system.actorSelection(fakeInputOutputHandlerActor.path.toString), Nil: _*)
.when(mockActorLoader).load(MessageType.Incoming.InputReply)
// Allow time for the actors to start. This avoids read() hanging forever
// when running tests in gradle.
Thread.sleep(100)
}
describe("KernelInputStream") {
describe("#available") {
it("should be zero when no input has been read") {
kernelInputStream.available() should be (0)
}
it("should match the bytes remaining internally") {
kernelInputStream.read()
kernelInputStream.available() should be (TestReplyString.length - 1)
}
}
describe("#read") {
it("should send a request for more data if the buffer is empty") {
// Fresh input stream has nothing in its buffer
kernelInputStream.read()
// Verify that a message was sent out requesting data
kernelInputOutputHandlerProbe.expectMsgPF(MaxAkkaTestTimeout) {
case KernelMessage(_, _, header, _, _, _)
if header.msg_type == MessageType.Outgoing.InputRequest.toString =>
true
}
}
it("should use the provided prompt in its requests") {
val expected = KernelInputStream.DefaultPrompt
// Fresh input stream has nothing in its buffer
kernelInputStream.read()
// Verify that a message was sent out requesting data with the
// specific prompt
kernelInputOutputHandlerProbe.expectMsgPF(MaxAkkaTestTimeout) {
case KernelMessage(_, _, header, _, _, contentString)
if header.msg_type == MessageType.Outgoing.InputRequest.toString =>
Json.parse(contentString).as[InputRequest].prompt should be (expected)
}
}
it("should use the provided password flag in its requests") {
val expected = KernelInputStream.DefaultPassword
// Fresh input stream has nothing in its buffer
kernelInputStream.read()
// Verify that a message was sent out requesting data with the
// specific prompt
kernelInputOutputHandlerProbe.expectMsgPF(MaxAkkaTestTimeout) {
case KernelMessage(_, _, header, _, _, contentString)
if header.msg_type == MessageType.Outgoing.InputRequest.toString =>
Json.parse(contentString).as[InputRequest].password should be (expected)
}
}
it("should return the next byte from the current buffer") {
kernelInputStream.read() should be (TestReplyString.head)
}
it("should not send a request for more data if data is in the buffer") {
// Run read for length of message (avoiding sending out a second
// request)
val readLength = TestReplyString.length
for (i <- 1 to readLength)
kernelInputStream.read() should be (TestReplyString.charAt(i - 1))
kernelInputOutputHandlerProbe.expectMsgClass(MaxAkkaTestTimeout, classOf[KernelMessage])
kernelInputOutputHandlerProbe.expectNoMessage(MaxAkkaTestTimeout)
}
}
}
}