blob: 0464d5742dae0f10cb6e9f33deb2a9962632f666 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License
*/
package org.apache.toree.communication.socket
import org.apache.toree.utils.LogLike
import org.zeromq.{ZMsg, ZMQ}
import org.zeromq.ZMQ.Context
import scala.collection.JavaConverters._
import scala.util.Try
/**
* Represents the runnable component of a socket that processes messages and
* sends messages placed on an outbound queue.
*
* @param context The ZMQ context to use with this runnable to create a socket
* @param socketType The type of socket to create
* @param inboundMessageCallback The callback to invoke when receiving a message
* on the socket created
* @param socketOptions The options to use when creating the socket
*/
class ZeroMQSocketRunnable(
private val context: Context,
private val socketType: SocketType,
private val inboundMessageCallback: Option[(Seq[Array[Byte]]) => Unit],
private val socketOptions: SocketOption*
) extends SocketRunnable[ZMsg](inboundMessageCallback)
with LogLike {
require(socketOptions.count {
case _: Bind => true
case _: Connect => true
case _ => false
} == 1, "ZeroMQ socket needs exactly one bind or connect!")
@volatile private var notClosed: Boolean = true
@volatile private var _isProcessing: Boolean = false
/**
* Indicates the processing state of this runnable.
*
* @return True if processing messages, otherwise false
*/
override def isProcessing: Boolean = _isProcessing
/**
* Processes the provided options, performing associated actions on the
* specified socket.
*
* @param socket The socket to apply actions on
*/
protected def processOptions(socket: ZMQ.Socket): Unit = {
val socketOptionsString = socketOptions.map("\n- " + _.toString).mkString("")
logger.trace(
s"Processing options for socket $socketType: $socketOptionsString"
)
// Split our options based on connection (bind/connect) and everything else
val (connectionOptions, otherOptions) = socketOptions.partition {
case Bind(_) | Connect(_) => true
case _ => false
}
// Apply non-connection options first since some (like identity) must be
// run before the socket does a bind/connect
otherOptions.foreach {
case Linger(milliseconds) => socket.setLinger(milliseconds)
case Subscribe(topic) => socket.subscribe(topic)
case Identity(identity) => socket.setIdentity(identity)
case option => logger.warn(s"Unknown option: $option")
}
// Perform our bind or connect
connectionOptions.foreach {
case Bind(address) => socket.bind(address)
case Connect(address) => socket.connect(address)
case option =>
logger.warn(s"Unknown connection option: $option")
}
_isProcessing = true
}
/**
* Sends the next outbound message from the outbound message queue.
*
* @param socket The socket to use when sending the message
*
* @return True if a message was sent, otherwise false
*/
protected def processNextOutboundMessage(socket: ZMQ.Socket): Boolean = {
val message = Option(outboundMessages.poll())
message.foreach(_.send(socket))
message.nonEmpty
}
/**
* Retrieves the next inbound message (if available) and invokes the
* inbound message callback.
*
* @param socket The socket whose next incoming message to retrieve
*/
protected def processNextInboundMessage(
socket: ZMQ.Socket,
flags: Int = ZMQ.DONTWAIT
): Unit = {
Option(ZMsg.recvMsg(socket, flags)).foreach(zMsg => {
inboundMessageCallback.foreach(_(zMsg.asScala.toSeq
.map(zFrame => zFrame.getData)
))
})
}
/**
* Creates a new instance of a ZMQ Socket.
*
* @param zmqContext The context to use to create the socket
* @param socketType The type of socket to create
*
* @return The new ZMQ.Socket instance
*/
protected def newZmqSocket(zmqContext: ZMQ.Context, socketType: Int) =
zmqContext.socket(socketType)
override def run(): Unit = {
val socket = newZmqSocket(context, socketType.`type`)//context.socket(socketType.`type`)
try {
processOptions(socket)
while (notClosed) {
Try(processNextOutboundMessage(socket)).failed.foreach(
logger.error("Failed to send next outgoing message!", _: Throwable)
)
Try(processNextInboundMessage(socket)).failed.foreach({
e: Throwable => {
e match {
case ex: java.lang.IllegalStateException => {
// if (ex.getMessage != "Cannot receive another request") {
// logger.error("Failed to retrieve next incoming message!", e)
// } else {
// /* Swallow this common exception */
// }
}
case _ => logger.error("Failed to retrieve next incoming message!", e)
}
}})
Thread.sleep(1)
}
} catch {
case ex: Exception =>
logger.error("Unexpected exception in 0mq socket runnable!", ex)
} finally {
try{
socket.close()
} catch {
case ex: Exception =>
logger.error("Failed to close socket!", _: Throwable)
}
}
}
/**
* Marks the runnable as closed such that it eventually stops processing
* messages and closes the socket.
*
* @throws AssertionError If the runnable is not processing messages or has
* already been closed
*/
override def close(): Unit = {
assert(_isProcessing && notClosed,
"Runnable is not processing or is closed!")
_isProcessing = false
notClosed = false
}
}