blob: 955529278aeffe72cae285e4d7590a430128fe8c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License
*/
package system
import org.apache.pekko.testkit.{TestKit, TestProbe}
import org.apache.toree.communication.ZMQMessage
import org.apache.toree.kernel.protocol.v5
import org.apache.toree.kernel.protocol.v5.client.Utilities._
import org.apache.toree.kernel.protocol.v5.content._
import org.apache.toree.kernel.protocol.v5.{KMBuilder, KernelMessage, SocketType}
import org.scalatest.concurrent.Eventually
import org.scalatest.exceptions.TestFailedDueToTimeoutException
import org.scalatest.time.{Milliseconds, Seconds, Span}
import org.scalatest.BeforeAndAfterAll
import org.scalatest.funspec.AnyFunSpecLike
import org.scalatest.matchers.should.Matchers
import play.api.libs.json.Json
import test.utils.SparkClientDeployer
import scala.concurrent.duration._
/**
* Used for system-wide (entire Spark Kernel) tests relating to Comm usage.
*
* @note Do not use non-generated target names or Comm ids! This is a shared
* kernel instance (to avoid death by slowness) and it is not guaranteed
* that previous tests will do proper cleanup!
*/
class ClientCommSpecForSystem
extends TestKit(SparkClientDeployer.getClientActorSystem)
with AnyFunSpecLike with Matchers with BeforeAndAfterAll with Eventually {
private val MaxFishTime = 2.seconds
implicit override val patienceConfig = PatienceConfig(
timeout = scaled(Span(2, Seconds)),
interval = scaled(Span(5, Milliseconds))
)
import test.utils.SparkClientDeployer._
private def waitForExecuteReply(
shell: TestProbe, headerId: v5.UUID, maxTime: Duration = MaxFishTime
): Unit =
shell.fishForMessage(maxTime) {
case KernelMessage(_, _, header, parentHeader, _, _)
if header.msg_type == ExecuteReply.toTypeString &&
parentHeader.msg_id == headerId=> true
case _ => false
}
// Not using TestCommId to avoid problems if previous comm using that
// id in tests was not properly closed
private def buildZMQCommOpen(
targetName: String, data: v5.MsgData,
commId: v5.UUID = java.util.UUID.randomUUID().toString
): (v5.UUID, ZMQMessage) = {
val kernelMessage = KMBuilder()
.withHeader(CommOpen.toTypeString)
.withContentString(CommOpen(
comm_id = commId,
target_name = targetName,
data = data
)).build
(kernelMessage.header.msg_id, kernelMessage)
}
// Not using TestCommId to avoid problems if previous comm using that
// id in tests was not properly closed
private def buildZMQCommMsg(commId: v5.UUID, data: v5.MsgData): (v5.UUID, ZMQMessage) = {
val kernelMessage = KMBuilder()
.withHeader(CommMsg.toTypeString)
.withContentString(CommMsg(
comm_id = commId,
data = data
)).build
(kernelMessage.header.msg_id, kernelMessage)
}
// Not using TestCommId to avoid problems if previous comm using that
// id in tests was not properly closed
private def buildZMQCommClose(commId: v5.UUID, data: v5.MsgData): (v5.UUID, ZMQMessage) = {
val kernelMessage = KMBuilder()
.withHeader(CommClose.toTypeString)
.withContentString(CommClose(
comm_id = commId,
data = data
)).build
(kernelMessage.header.msg_id, kernelMessage)
}
describe("Comm for Client System") {
describe("executing Comm API to open a new comm") {
it("should correctly send comm_open to socket") {
withSparkClient { (client, actorLoader, heartbeat, stdin, shell, ioPub) =>
val testTargetName = java.util.UUID.randomUUID().toString
client.comm.open(testTargetName)
// Should discover an outgoing kernel message for comm_open
shell.fishForMessage(MaxFishTime) {
case KernelMessage(_, _, header, _, _, _)
if header.msg_type == CommOpen.toTypeString => true
case _ => false
}
}
}
}
describe("executing Comm API to send a message") {
it("should correctly send comm_msg to socket") {
withSparkClient { (client, actorLoader, heartbeat, stdin, shell, ioPub) =>
val testTargetName = java.util.UUID.randomUUID().toString
val commWriter = client.comm.open(testTargetName)
commWriter.writeMsg(v5.MsgData("key" -> "value"))
// Should discover an outgoing kernel message for comm_msg
shell.fishForMessage(MaxFishTime) {
case KernelMessage(_, _, header, _, _, contentString)
if header.msg_type == CommMsg.toTypeString =>
(Json.parse(contentString).as[CommMsg].data \ "key").as[String] == "value"
case _ => false
}
}
}
}
describe("executing Comm API to close an existing comm") {
it("should correctly send comm_close to socket") {
withSparkClient { (client, actorLoader, heartbeat, stdin, shell, ioPub) =>
val testTargetName = java.util.UUID.randomUUID().toString
val commWriter = client.comm.open(testTargetName)
commWriter.close()
// Should discover an outgoing kernel message for comm_close
shell.fishForMessage(MaxFishTime) {
case KernelMessage(_, _, header, _, _, _)
if header.msg_type == CommClose.toTypeString => true
case _ => false
}
}
}
}
describe("receiving Comm API open from a kernel") {
it("should respond comm_close if the target is not found") {
withSparkClient { (client, actorLoader, heartbeat, stdin, shell, ioPub) =>
val testTargetName = java.util.UUID.randomUUID().toString
// Send a comm_open (as if a kernel did it)
val (_, message) = buildZMQCommOpen(testTargetName, v5.MsgData.Empty)
actorLoader.load(SocketType.IOPubClient) ! message
// Should discover an outgoing kernel message for comm_close
shell.fishForMessage(MaxFishTime) {
case KernelMessage(_, _, header, _, _, _)
if header.msg_type == CommClose.toTypeString => true
case _ => false
}
}
}
it("should not execute open callbacks if the target is not found") {
withSparkClient { (client, actorLoader, heartbeat, stdin, shell, ioPub) =>
val testTargetName = java.util.UUID.randomUUID().toString
var openExecuted = false
client.comm.register(testTargetName).addOpenHandler {
(_, _, _, _) => openExecuted = true
}
// Send a comm_open (as if a kernel did it)
val (_, message) = buildZMQCommOpen(testTargetName, v5.MsgData.Empty)
actorLoader.load(SocketType.IOPubClient) ! message
eventually { openExecuted should be (true) }
}
}
it("should execute open callbacks if the target is found") {
withSparkClient { (client, actorLoader, heartbeat, stdin, shell, ioPub) =>
val testTargetName = java.util.UUID.randomUUID().toString
var openExecuted = false
client.comm.register(testTargetName).addOpenHandler {
(_, _, _, _) => openExecuted = true
}
// Send a comm_open (as if a kernel did it)
val (_, message) = buildZMQCommOpen(testTargetName, v5.MsgData.Empty)
actorLoader.load(SocketType.IOPubClient) ! message
eventually { openExecuted should be (true) }
}
}
}
describe("receiving Comm API message from a client") {
it("should not execute message callbacks if the Comm id is not found") {
withSparkClient { (client, actorLoader, heartbeat, stdin, shell, ioPub) =>
val testTargetName = java.util.UUID.randomUUID().toString
val testCommId = java.util.UUID.randomUUID().toString
var msgExecuted = false
client.comm.register(testTargetName)
.addCloseHandler { (_, _, _) => msgExecuted = true }
// Send a comm_close (as if a kernel did it)
val (_, msgMessage) = buildZMQCommMsg(testCommId, v5.MsgData.Empty)
actorLoader.load(SocketType.IOPubClient) ! msgMessage
intercept[TestFailedDueToTimeoutException] {
eventually { msgExecuted should be (true) }
}
}
}
it("should execute message callbacks if the Comm id is found") {
withSparkClient { (client, actorLoader, heartbeat, stdin, shell, ioPub) =>
val testTargetName = java.util.UUID.randomUUID().toString
val testCommId = java.util.UUID.randomUUID().toString
var openExecuted = false
var msgExecuted = false
client.comm.register(testTargetName)
.addOpenHandler { (_, _, _, _) => openExecuted = true }
.addMsgHandler { (_, _, _) => msgExecuted = true }
// Send a comm_open (as if a kernel did it)
val (_, openMessage) =
buildZMQCommOpen(testTargetName, v5.MsgData.Empty, testCommId)
actorLoader.load(SocketType.IOPubClient) ! openMessage
eventually { openExecuted should be (true) }
// Send a comm_msg (as if a kernel did it)
val (_, msgMessage) = buildZMQCommMsg(testCommId, v5.MsgData.Empty)
actorLoader.load(SocketType.IOPubClient) ! msgMessage
eventually { msgExecuted should be (true) }
}
}
}
describe("receiving Comm API close from a client") {
it("should not execute close callbacks if the Comm id is not found") {
withSparkClient { (client, actorLoader, heartbeat, stdin, shell, ioPub) =>
val testTargetName = java.util.UUID.randomUUID().toString
val testCommId = java.util.UUID.randomUUID().toString
var closeExecuted = false
client.comm.register(testTargetName)
.addCloseHandler { (_, _, _) => closeExecuted = true }
// Send a comm_close (as if a kernel did it)
val (_, closeMessage) = buildZMQCommClose(testCommId, v5.MsgData.Empty)
actorLoader.load(SocketType.IOPubClient) ! closeMessage
intercept[TestFailedDueToTimeoutException] {
eventually { closeExecuted should be (true) }
}
}
}
it("should execute close callbacks if the Comm id is found") {
withSparkClient { (client, actorLoader, heartbeat, stdin, shell, ioPub) =>
val testTargetName = java.util.UUID.randomUUID().toString
val testCommId = java.util.UUID.randomUUID().toString
var openExecuted = false
var closeExecuted = false
client.comm.register(testTargetName)
.addOpenHandler { (_, _, _, _) => openExecuted = true }
.addCloseHandler { (_, _, _) => closeExecuted = true }
// Send a comm_open (as if a kernel did it)
val (_, openMessage) =
buildZMQCommOpen(testTargetName, v5.MsgData.Empty, testCommId)
actorLoader.load(SocketType.IOPubClient) ! openMessage
eventually { openExecuted should be (true) }
// Send a comm_close (as if a kernel did it)
val (_, closeMessage) = buildZMQCommClose(testCommId, v5.MsgData.Empty)
actorLoader.load(SocketType.IOPubClient) ! closeMessage
eventually { closeExecuted should be (true) }
}
}
it("should unlink the Comm id from the target if the Comm id is found") {
withSparkClient { (client, actorLoader, heartbeat, stdin, shell, ioPub) =>
val testTargetName = java.util.UUID.randomUUID().toString
val testCommId = java.util.UUID.randomUUID().toString
var openExecuted = false
var closeExecuted = false
client.comm.register(testTargetName)
.addOpenHandler { (_, _, _, _) => openExecuted = true }
.addCloseHandler { (_, _, _) => closeExecuted = true }
// Send a comm_open (as if a kernel did it)
val (_, openMessage) =
buildZMQCommOpen(testTargetName, v5.MsgData.Empty, testCommId)
actorLoader.load(SocketType.IOPubClient) ! openMessage
eventually { openExecuted should be (true) }
// Send a comm_close (as if a kernel did it)
val (_, closeMessage1) = buildZMQCommClose(testCommId, v5.MsgData.Empty)
actorLoader.load(SocketType.IOPubClient) ! closeMessage1
eventually { closeExecuted should be (true) }
// Reset close event flag
closeExecuted = false
// Send a comm_close (again)
val (_, closeMessage2) = buildZMQCommClose(testCommId, v5.MsgData.Empty)
actorLoader.load(SocketType.IOPubClient) ! closeMessage2
intercept[TestFailedDueToTimeoutException] {
eventually { closeExecuted should be (true) }
}
}
}
}
}
}