blob: 6dd1224e9f26e1e0748ff67d0d9ce4a94c469d7c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License
*/
package org.apache.toree.kernel.protocol.v5.kernel
import java.nio.charset.Charset
import org.apache.pekko.util.{ByteString, Timeout}
import org.apache.toree.communication.ZMQMessage
import org.apache.toree.kernel.protocol.v5._
import org.apache.toree.utils.LogLike
import play.api.libs.json.JsonValidationError
import play.api.libs.json.{JsPath, Json, Reads}
import scala.concurrent.duration._
object Utilities extends LogLike {
//
// NOTE: This is brought in to remove feature warnings regarding the use of
// implicit conversions regarding the following:
//
// 1. ByteStringToString
// 2. ZMQMessageToKernelMessage
//
import scala.language.implicitConversions
/**
* This timeout needs to be defined for the Akka asks to timeout
*/
implicit val timeout = Timeout(21474835.seconds)
implicit def ByteStringToString(byteString : ByteString) : String = {
new String(byteString.toArray, Charset.forName("UTF-8"))
}
implicit def StringToByteString(string : String) : ByteString = {
ByteString(string.getBytes)
}
implicit def ZMQMessageToKernelMessage(message: ZMQMessage): KernelMessage = {
val delimiterIndex: Int =
message.frames.indexOf(ByteString("<IDS|MSG>".getBytes))
// TODO Handle the case where there is no delimiter
val ids: Seq[Array[Byte]] =
message.frames.take(delimiterIndex).map(
(byteString : ByteString) => { byteString.toArray }
)
val header = Json.parse(message.frames(delimiterIndex + 2)).as[Header]
// TODO: Investigate better solution than setting parentHeader to null for {}
val parentHeader = parseAndHandle(message.frames(delimiterIndex + 3),
ParentHeader.headerReads,
handler = (valid: ParentHeader) => valid,
errHandler = _ => null
)
val metadata = Json.parse(message.frames(delimiterIndex + 4)).as[Metadata]
KMBuilder().withIds(ids.toList)
.withSignature(message.frame(delimiterIndex + 1))
.withHeader(header)
.withParentHeader(parentHeader)
.withMetadata(metadata)
.withContentString(message.frame(delimiterIndex + 5)).build(false)
}
implicit def KernelMessageToZMQMessage(kernelMessage : KernelMessage) : ZMQMessage = {
val frames: scala.collection.mutable.ListBuffer[ByteString] = scala.collection.mutable.ListBuffer()
kernelMessage.ids.map((id : Array[Byte]) => frames += ByteString.apply(id) )
frames += "<IDS|MSG>"
frames += kernelMessage.signature
frames += Json.toJson(kernelMessage.header).toString()
frames += Json.toJson(kernelMessage.parentHeader).toString()
frames += Json.toJson(kernelMessage.metadata).toString
frames += kernelMessage.contentString
ZMQMessage(frames : _*)
}
def parseAndHandle[T, U](json: String, reads: Reads[T],
handler: T => U) : U = {
parseAndHandle(json, reads, handler,
(invalid: Seq[(JsPath, Seq[JsonValidationError])]) => {
logger.error(s"Could not parse JSON, ${json}")
throw new Throwable(s"Could not parse JSON, ${json}")
}
)
}
def parseAndHandle[T, U](json: String, reads: Reads[T],
handler: T => U,
errHandler: Seq[(JsPath, Seq[JsonValidationError])] => U) : U = {
Json.parse(json).validate[T](reads).fold(
errHandler,
(content: T) => handler(content)
)
}
}