blob: b056e250b901411c4ee9bfa0c222cebd4ab51e64 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.network
import java.util.concurrent._
import java.util.concurrent.atomic._
import java.net._
import java.io._
import java.nio.channels._
import kafka.common.KafkaException
import kafka.utils._
/**
* An NIO socket server. The threading model is
* 1 Acceptor thread that handles new connections
* N Processor threads that each have their own selector and read requests from sockets
* M Handler threads that handle requests and produce responses back to the processor threads for writing.
*/
class SocketServer(val brokerId: Int,
val host: String,
val port: Int,
val numProcessorThreads: Int,
val maxQueuedRequests: Int,
val maxRequestSize: Int = Int.MaxValue) extends Logging {
this.logIdent = "[Socket Server on Broker " + brokerId + "], "
private val time = SystemTime
private val processors = new Array[Processor](numProcessorThreads)
@volatile private var acceptor: Acceptor = null
val requestChannel = new RequestChannel(numProcessorThreads, maxQueuedRequests)
/**
* Start the socket server
*/
def startup() {
for(i <- 0 until numProcessorThreads) {
processors(i) = new Processor(i, time, maxRequestSize, requestChannel)
Utils.newThread("kafka-processor-%d-%d".format(port, i), processors(i), false).start()
}
// register the processor threads for notification of responses
requestChannel.addResponseListener((id:Int) => processors(id).wakeup())
// start accepting connections
this.acceptor = new Acceptor(host, port, processors)
Utils.newThread("kafka-acceptor", acceptor, false).start()
acceptor.awaitStartup
info("started")
}
/**
* Shutdown the socket server
*/
def shutdown() = {
info("shutting down")
if(acceptor != null)
acceptor.shutdown()
for(processor <- processors)
processor.shutdown()
info("shut down completely")
}
}
/**
* A base class with some helper variables and methods
*/
private[kafka] abstract class AbstractServerThread extends Runnable with Logging {
protected val selector = Selector.open();
private val startupLatch = new CountDownLatch(1)
private val shutdownLatch = new CountDownLatch(1)
private val alive = new AtomicBoolean(false)
/**
* Initiates a graceful shutdown by signaling to stop and waiting for the shutdown to complete
*/
def shutdown(): Unit = {
alive.set(false)
selector.wakeup()
shutdownLatch.await
}
/**
* Wait for the thread to completely start up
*/
def awaitStartup(): Unit = startupLatch.await
/**
* Record that the thread startup is complete
*/
protected def startupComplete() = {
alive.set(true)
startupLatch.countDown
}
/**
* Record that the thread shutdown is complete
*/
protected def shutdownComplete() = shutdownLatch.countDown
/**
* Is the server still running?
*/
protected def isRunning = alive.get
/**
* Wakeup the thread for selection.
*/
def wakeup() = selector.wakeup()
}
/**
* Thread that accepts and configures new connections. There is only need for one of these
*/
private[kafka] class Acceptor(val host: String, val port: Int, private val processors: Array[Processor]) extends AbstractServerThread {
val serverChannel = openServerSocket(host, port)
/**
* Accept loop that checks for new connection attempts
*/
def run() {
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
startupComplete()
var currentProcessor = 0
while(isRunning) {
val ready = selector.select(500)
if(ready > 0) {
val keys = selector.selectedKeys()
val iter = keys.iterator()
while(iter.hasNext && isRunning) {
var key: SelectionKey = null
try {
key = iter.next
iter.remove()
if(key.isAcceptable)
accept(key, processors(currentProcessor))
else
throw new IllegalStateException("Unrecognized key state for acceptor thread.")
// round robin to the next processor thread
currentProcessor = (currentProcessor + 1) % processors.length
} catch {
case e: Throwable => error("Error in acceptor", e)
}
}
}
}
debug("Closing server socket and selector.")
swallowError(serverChannel.close())
swallowError(selector.close())
shutdownComplete()
}
/*
* Create a server socket to listen for connections on.
*/
def openServerSocket(host: String, port: Int): ServerSocketChannel = {
val socketAddress =
if(host == null || host.trim.isEmpty)
new InetSocketAddress(port)
else
new InetSocketAddress(host, port)
val serverChannel = ServerSocketChannel.open()
serverChannel.configureBlocking(false)
try {
serverChannel.socket.bind(socketAddress)
info("Awaiting socket connections on %s:%d.".format(socketAddress.getHostName, port))
} catch {
case e: SocketException =>
throw new KafkaException("Socket server failed to bind to %s:%d: %s.".format(socketAddress.getHostName, port, e.getMessage), e)
}
serverChannel
}
/*
* Accept a new connection
*/
def accept(key: SelectionKey, processor: Processor) {
val socketChannel = key.channel().asInstanceOf[ServerSocketChannel].accept()
debug("Accepted connection from " + socketChannel.socket.getInetAddress() + " on " + socketChannel.socket.getLocalSocketAddress)
socketChannel.configureBlocking(false)
socketChannel.socket().setTcpNoDelay(true)
processor.accept(socketChannel)
}
}
/**
* Thread that processes all requests from a single connection. There are N of these running in parallel
* each of which has its own selectors
*/
private[kafka] class Processor(val id: Int,
val time: Time,
val maxRequestSize: Int,
val requestChannel: RequestChannel) extends AbstractServerThread {
private val newConnections = new ConcurrentLinkedQueue[SocketChannel]();
override def run() {
startupComplete()
while(isRunning) {
// setup any new connections that have been queued up
configureNewConnections()
// register any new responses for writing
processNewResponses()
val ready = selector.select(300)
if(ready > 0) {
val keys = selector.selectedKeys()
val iter = keys.iterator()
while(iter.hasNext && isRunning) {
var key: SelectionKey = null
try {
key = iter.next
iter.remove()
if(key.isReadable)
read(key)
else if(key.isWritable)
write(key)
else if(!key.isValid)
close(key)
else
throw new IllegalStateException("Unrecognized key state for processor thread.")
} catch {
case e: EOFException => {
info("Closing socket connection to %s.".format(channelFor(key).socket.getInetAddress))
close(key)
} case e: InvalidRequestException => {
info("Closing socket connection to %s due to invalid request: %s".format(channelFor(key).socket.getInetAddress, e.getMessage))
close(key)
} case e: Throwable => {
error("Closing socket for " + channelFor(key).socket.getInetAddress + " because of error", e)
close(key)
}
}
}
}
}
debug("Closing selector.")
swallowError(selector.close())
shutdownComplete()
}
private def processNewResponses() {
var curr = requestChannel.receiveResponse(id)
while(curr != null) {
trace("Socket server received response to send, registering for write: " + curr)
val key = curr.request.requestKey.asInstanceOf[SelectionKey]
try {
key.interestOps(SelectionKey.OP_WRITE)
key.attach(curr)
} catch {
case e: CancelledKeyException => {
debug("Ignoring response for closed socket.")
close(key)
}
}finally {
curr = requestChannel.receiveResponse(id)
}
}
}
private def close(key: SelectionKey) {
val channel = key.channel.asInstanceOf[SocketChannel]
debug("Closing connection from " + channel.socket.getRemoteSocketAddress())
swallowError(channel.socket().close())
swallowError(channel.close())
key.attach(null)
swallowError(key.cancel())
}
/**
* Queue up a new connection for reading
*/
def accept(socketChannel: SocketChannel) {
newConnections.add(socketChannel)
wakeup()
}
/**
* Register any new connections that have been queued up
*/
private def configureNewConnections() {
while(newConnections.size() > 0) {
val channel = newConnections.poll()
debug("Listening to new connection from " + channel.socket.getRemoteSocketAddress)
channel.register(selector, SelectionKey.OP_READ)
}
}
/*
* Process reads from ready sockets
*/
def read(key: SelectionKey) {
val socketChannel = channelFor(key)
var receive = key.attachment.asInstanceOf[Receive]
if(key.attachment == null) {
receive = new BoundedByteBufferReceive(maxRequestSize)
key.attach(receive)
}
val read = receive.readFrom(socketChannel)
val address = socketChannel.socket.getRemoteSocketAddress();
trace(read + " bytes read from " + address)
if(read < 0) {
close(key)
} else if(receive.complete) {
val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeMs = time.milliseconds, remoteAddress = address)
requestChannel.sendRequest(req)
trace("Received request, sending for processing by handler: " + req)
key.attach(null)
} else {
// more reading to be done
key.interestOps(SelectionKey.OP_READ)
wakeup()
}
}
/*
* Process writes to ready sockets
*/
def write(key: SelectionKey) {
val socketChannel = channelFor(key)
val response = key.attachment().asInstanceOf[RequestChannel.Response]
val responseSend = response.responseSend
if(responseSend == null)
throw new IllegalStateException("Registered for write interest but no response attached to key.")
val written = responseSend.writeTo(socketChannel)
trace(written + " bytes written to " + socketChannel.socket.getRemoteSocketAddress())
if(responseSend.complete) {
response.request.updateRequestMetrics()
key.attach(null)
key.interestOps(SelectionKey.OP_READ)
} else {
key.interestOps(SelectionKey.OP_WRITE)
wakeup()
}
}
private def channelFor(key: SelectionKey) = key.channel().asInstanceOf[SocketChannel]
}