| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License |
| */ |
| |
| package org.apache.toree.kernel.protocol.v5.client.socket |
| |
| import akka.actor.{ActorRef, Props, ActorSystem} |
| import akka.testkit.{TestProbe, ImplicitSender, TestKit} |
| import org.apache.toree.communication.ZMQMessage |
| import org.apache.toree.communication.security.SecurityActorType |
| import org.apache.toree.kernel.protocol.v5._ |
| import org.apache.toree.kernel.protocol.v5.client.ActorLoader |
| import org.apache.toree.kernel.protocol.v5.client.socket.StdinClient.{ResponseFunctionMessage, ResponseFunction} |
| import org.apache.toree.kernel.protocol.v5.content.{InputReply, InputRequest, ClearOutput, ExecuteRequest} |
| import org.scalatest.mock.MockitoSugar |
| import org.scalatest.{BeforeAndAfter, FunSpecLike, Matchers} |
| import org.apache.toree.kernel.protocol.v5.client.Utilities._ |
| import play.api.libs.json.Json |
| import scala.concurrent.duration._ |
| |
| import org.mockito.Mockito._ |
| import org.mockito.Matchers._ |
| |
| class StdinClientSpec extends TestKit(ActorSystem("StdinActorSpec")) |
| with ImplicitSender with FunSpecLike with Matchers with MockitoSugar |
| with BeforeAndAfter |
| { |
| private val SignatureEnabled = true |
| private val TestReplyString = "some value" |
| private val TestResponseFunc: ResponseFunction = (_, _) => TestReplyString |
| |
| private var mockSocketFactory: SocketFactory = _ |
| private var mockActorLoader: ActorLoader = _ |
| private var signatureManagerProbe: TestProbe = _ |
| private var socketProbe: TestProbe = _ |
| private var stdinClient: ActorRef = _ |
| |
| before { |
| socketProbe = TestProbe() |
| signatureManagerProbe = TestProbe() |
| mockSocketFactory = mock[SocketFactory] |
| mockActorLoader = mock[ActorLoader] |
| doReturn(system.actorSelection(signatureManagerProbe.ref.path.toString)) |
| .when(mockActorLoader).load(SecurityActorType.SignatureManager) |
| doReturn(socketProbe.ref).when(mockSocketFactory) |
| .StdinClient(any[ActorSystem], any[ActorRef]) |
| |
| stdinClient = system.actorOf(Props( |
| classOf[StdinClient], mockSocketFactory, mockActorLoader, SignatureEnabled |
| )) |
| |
| // Set the response function for our client socket |
| stdinClient ! ResponseFunctionMessage(TestResponseFunc) |
| } |
| |
| describe("StdinClient") { |
| describe("#receive") { |
| it("should update the response function if receiving a new one") { |
| val expected = "some other value" |
| val replacementFunc: ResponseFunction = (_, _) => expected |
| |
| // Update the function |
| stdinClient ! ResponseFunctionMessage(replacementFunc) |
| |
| val inputRequestMessage: ZMQMessage = KMBuilder() |
| .withHeader(InputRequest.toTypeString) |
| .withContentString(InputRequest("", false)) |
| .build |
| |
| stdinClient ! inputRequestMessage |
| |
| // Echo back the kernel message sent to have a signature injected |
| signatureManagerProbe.expectMsgPF() { |
| case kernelMessage: KernelMessage => |
| signatureManagerProbe.reply(kernelMessage) |
| true |
| } |
| |
| socketProbe.expectMsgPF() { |
| case zmqMessage: ZMQMessage => |
| val kernelMessage: KernelMessage = zmqMessage |
| val inputReply = |
| Json.parse(kernelMessage.contentString).as[InputReply] |
| inputReply.value should be (expected) |
| } |
| } |
| |
| it("should do nothing if the incoming message is not an input_request") { |
| val notInputRequestMessage: ZMQMessage = KMBuilder() |
| .withHeader(ClearOutput.toTypeString) |
| .build |
| |
| stdinClient ! notInputRequestMessage |
| |
| socketProbe.expectNoMessage(300.milliseconds) |
| } |
| |
| it("should respond with an input_reply if the incoming message is " + |
| "an input_request") { |
| val inputRequestMessage: ZMQMessage = KMBuilder() |
| .withHeader(InputRequest.toTypeString) |
| .withContentString(InputRequest("", false)) |
| .build |
| |
| stdinClient ! inputRequestMessage |
| |
| // Echo back the kernel message sent to have a signature injected |
| signatureManagerProbe.expectMsgPF() { |
| case kernelMessage: KernelMessage => |
| signatureManagerProbe.reply(kernelMessage) |
| true |
| } |
| |
| socketProbe.expectMsgPF() { |
| case zmqMessage: ZMQMessage => |
| val kernelMessage: KernelMessage = zmqMessage |
| val messageType = kernelMessage.header.msg_type |
| messageType should be (InputReply.toTypeString) |
| } |
| } |
| |
| it("should use the result from the response function if the incoming " + |
| "message is an input_request") { |
| val inputRequestMessage: ZMQMessage = KMBuilder() |
| .withHeader(InputRequest.toTypeString) |
| .withContentString(InputRequest("", false)) |
| .build |
| |
| stdinClient ! inputRequestMessage |
| |
| // Echo back the kernel message sent to have a signature injected |
| signatureManagerProbe.expectMsgPF() { |
| case kernelMessage: KernelMessage => |
| signatureManagerProbe.reply(kernelMessage) |
| true |
| } |
| |
| socketProbe.expectMsgPF() { |
| case zmqMessage: ZMQMessage => |
| val kernelMessage: KernelMessage = zmqMessage |
| val inputReply = |
| Json.parse(kernelMessage.contentString).as[InputReply] |
| inputReply.value should be (TestReplyString) |
| } |
| } |
| } |
| } |
| } |