blob: 99658df26d5f7d794984bb756cb1e136df89a894 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.bookkeeper.proto;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.Channel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
/**
* This class handles communication with clients using NIO. There is one Cnxn
* per client, but only one thread doing the communication.
*/
public class NIOServerFactory extends Thread {
public interface PacketProcessor {
public void processPacket(ByteBuffer packet, Cnxn src);
}
ServerStats stats = new ServerStats();
Logger LOG = LoggerFactory.getLogger(NIOServerFactory.class);
ServerSocketChannel ss;
Selector selector = Selector.open();
/**
* We use this buffer to do efficient socket I/O. Since there is a single
* sender thread per NIOServerCnxn instance, we can use a member variable to
* only allocate it once.
*/
ByteBuffer directBuffer = ByteBuffer.allocateDirect(64 * 1024);
HashSet<Cnxn> cnxns = new HashSet<Cnxn>();
int outstandingLimit = 2000;
PacketProcessor processor;
long minLatency = 99999999;
ServerConfiguration conf;
private AtomicBoolean crashed = new AtomicBoolean(false);
private Object suspensionLock = new Object();
private boolean suspended = false;
public NIOServerFactory(ServerConfiguration conf, PacketProcessor processor) throws IOException {
super("NIOServerFactory-" + conf.getBookiePort());
setDaemon(true);
this.processor = processor;
this.conf = conf;
this.ss = ServerSocketChannel.open();
if (conf.getListeningInterface() == null) {
// listen on all interfaces
ss.socket().bind(new InetSocketAddress(conf.getBookiePort()));
} else {
ss.socket().bind(Bookie.getBookieAddress(conf));
}
ss.configureBlocking(false);
ss.register(selector, SelectionKey.OP_ACCEPT);
}
public InetSocketAddress getLocalAddress() {
return (InetSocketAddress) ss.socket().getLocalSocketAddress();
}
private void addCnxn(Cnxn cnxn) {
synchronized (cnxns) {
cnxns.add(cnxn);
}
}
public boolean isRunning() {
return !ss.socket().isClosed() && isAlive();
}
boolean hasCrashed() {
return crashed.get();
}
/**
* Stop nio server from processing requests. (for testing)
*/
@VisibleForTesting
public void suspendProcessing() {
synchronized(suspensionLock) {
suspended = true;
}
}
/**
* Resume processing requests in nio server. (for testing)
*/
@VisibleForTesting
public void resumeProcessing() {
synchronized(suspensionLock) {
suspended = false;
suspensionLock.notify();
}
}
@Override
public void run() {
while (!ss.socket().isClosed()) {
try {
selector.select(1000);
synchronized(suspensionLock) {
while (suspended) {
suspensionLock.wait();
}
}
Set<SelectionKey> selected;
synchronized (this) {
selected = selector.selectedKeys();
}
ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(selected);
Collections.shuffle(selectedList);
for (SelectionKey k : selectedList) {
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
sc.configureBlocking(false);
SelectionKey sk = sc.register(selector, SelectionKey.OP_READ);
Cnxn cnxn = new Cnxn(sc, sk);
sk.attach(cnxn);
addCnxn(cnxn);
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
Cnxn c = (Cnxn) k.attachment();
c.doIO(k);
}
}
selected.clear();
} catch (Exception e) {
LOG.warn("Exception in server socket loop: " + ss.socket().getInetAddress(), e);
} catch (Throwable e) {
LOG.error("Error in server socket loop: " + ss.socket().getInetAddress(), e);
crashed.set(true);
break;
}
}
LOG.info("NIOServerCnxn factory exitedloop.");
clear();
}
/**
* clear all the connections in the selector
*
*/
synchronized public void clear() {
selector.wakeup();
synchronized (cnxns) {
// got to clear all the connections that we have in the selector
for (Iterator<Cnxn> it = cnxns.iterator(); it.hasNext();) {
Cnxn cnxn = it.next();
it.remove();
try {
cnxn.close();
} catch (Exception e) {
// Do nothing.
}
}
}
}
public void shutdown() {
try {
ss.close();
clear();
this.interrupt();
this.join();
} catch (InterruptedException e) {
LOG.warn("Interrupted", e);
} catch (Exception e) {
LOG.error("Unexpected exception", e);
}
}
/**
* The buffer will cause the connection to be close when we do a send.
*/
static final ByteBuffer closeConn = ByteBuffer.allocate(0);
public class Cnxn {
private SocketChannel sock;
private SelectionKey sk;
boolean initialized;
ByteBuffer lenBuffer = ByteBuffer.allocate(4);
ByteBuffer incomingBuffer = lenBuffer;
LinkedBlockingQueue<ByteBuffer> outgoingBuffers = new LinkedBlockingQueue<ByteBuffer>();
int sessionTimeout;
void doIO(SelectionKey k) throws InterruptedException {
try {
if (sock == null) {
return;
}
if (k.isReadable()) {
int rc = sock.read(incomingBuffer);
if (rc < 0) {
LOG.info("Peer closed connection. rc={} {}", rc, sock);
close();
return;
}
if (incomingBuffer.remaining() == 0) {
incomingBuffer.flip();
if (incomingBuffer == lenBuffer) {
readLength(k);
} else {
cnxnStats.packetsReceived++;
ServerStats.getInstance().incrementPacketsReceived();
try {
readRequest();
} finally {
lenBuffer.clear();
incomingBuffer = lenBuffer;
}
}
}
}
if (k.isWritable()) {
if (outgoingBuffers.size() > 0) {
// ZooLog.logTraceMessage(LOG,
// ZooLog.CLIENT_DATA_PACKET_TRACE_MASK,
// "sk " + k + " is valid: " +
// k.isValid());
/*
* This is going to reset the buffer position to 0 and
* the limit to the size of the buffer, so that we can
* fill it with data from the non-direct buffers that we
* need to send.
*/
directBuffer.clear();
for (ByteBuffer b : outgoingBuffers) {
if (directBuffer.remaining() < b.remaining()) {
/*
* When we call put later, if the directBuffer
* is to small to hold everything, nothing will
* be copied, so we've got to slice the buffer
* if it's too big.
*/
b = (ByteBuffer) b.slice().limit(directBuffer.remaining());
}
/*
* put() is going to modify the positions of both
* buffers, put we don't want to change the position
* of the source buffers (we'll do that after the
* send, if needed), so we save and reset the
* position after the copy
*/
int p = b.position();
directBuffer.put(b);
b.position(p);
if (directBuffer.remaining() == 0) {
break;
}
}
/*
* Do the flip: limit becomes position, position gets
* set to 0. This sets us up for the write.
*/
directBuffer.flip();
int sent = sock.write(directBuffer);
ByteBuffer bb;
// Remove the buffers that we have sent
while (outgoingBuffers.size() > 0) {
bb = outgoingBuffers.peek();
if (bb == closeConn) {
throw new IOException("closing");
}
int left = bb.remaining() - sent;
if (left > 0) {
/*
* We only partially sent this buffer, so we
* update the position and exit the loop.
*/
bb.position(bb.position() + sent);
break;
}
cnxnStats.packetsSent++;
/* We've sent the whole buffer, so drop the buffer */
sent -= bb.remaining();
ServerStats.getInstance().incrementPacketsSent();
outgoingBuffers.remove();
}
// ZooLog.logTraceMessage(LOG,
// ZooLog.CLIENT_DATA_PACKET_TRACE_MASK, "after send,
// outgoingBuffers.size() = " + outgoingBuffers.size());
}
synchronized (this) {
if (outgoingBuffers.size() == 0) {
if (!initialized && (sk.interestOps() & SelectionKey.OP_READ) == 0) {
throw new IOException("Responded to info probe");
}
sk.interestOps(sk.interestOps() & (~SelectionKey.OP_WRITE));
} else {
sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE);
}
}
}
} catch (CancelledKeyException e) {
close();
} catch (IOException e) {
// LOG.error("FIXMSG",e);
close();
}
}
private void readRequest() throws IOException {
incomingBuffer = incomingBuffer.slice();
processor.processPacket(incomingBuffer, this);
}
public void disableRecv() {
sk.interestOps(sk.interestOps() & (~SelectionKey.OP_READ));
}
public void enableRecv() {
if (sk.isValid()) {
int interest = sk.interestOps();
if ((interest & SelectionKey.OP_READ) == 0) {
sk.interestOps(interest | SelectionKey.OP_READ);
}
}
}
private void readLength(SelectionKey k) throws IOException {
// Read the length, now get the buffer
int len = lenBuffer.getInt();
if (len < 0 || len > 0xfffff) {
throw new IOException("Len error " + len);
}
incomingBuffer = ByteBuffer.allocate(len);
}
/**
* The number of requests that have been submitted but not yet responded
* to.
*/
int outstandingRequests;
/*
* (non-Javadoc)
*
* @see org.apache.zookeeper.server.ServerCnxnIface#getSessionTimeout()
*/
public int getSessionTimeout() {
return sessionTimeout;
}
String peerName = null;
public Cnxn(SocketChannel sock, SelectionKey sk) throws IOException {
this.sock = sock;
this.sk = sk;
sock.socket().setTcpNoDelay(conf.getServerTcpNoDelay());
sock.socket().setSoLinger(true, 2);
sk.interestOps(SelectionKey.OP_READ);
if (LOG.isTraceEnabled()) {
peerName = sock.socket().toString();
}
lenBuffer.clear();
incomingBuffer = lenBuffer;
}
@Override
public String toString() {
return "NIOServerCnxn object with sock = " + sock + " and sk = " + sk;
}
public String getPeerName() {
if (peerName == null) {
peerName = sock.socket().toString();
}
return peerName;
}
boolean closed;
/*
* (non-Javadoc)
*
* @see org.apache.zookeeper.server.ServerCnxnIface#close()
*/
public void close() {
if (closed) {
return;
}
closed = true;
synchronized (cnxns) {
cnxns.remove(this);
}
LOG.debug("close NIOServerCnxn: {}", sock);
try {
/*
* The following sequence of code is stupid! You would think
* that only sock.close() is needed, but alas, it doesn't work
* that way. If you just do sock.close() there are cases where
* the socket doesn't actually close...
*/
sock.socket().shutdownOutput();
} catch (IOException e) {
// This is a relatively common exception that we can't avoid
}
try {
sock.socket().shutdownInput();
} catch (IOException e) {
}
try {
sock.socket().close();
} catch (IOException e) {
LOG.error("FIXMSG", e);
}
try {
sock.close();
// XXX The next line doesn't seem to be needed, but some posts
// to forums suggest that it is needed. Keep in mind if errors
// in
// this section arise.
// factory.selector.wakeup();
} catch (IOException e) {
LOG.error("FIXMSG", e);
}
sock = null;
if (sk != null) {
try {
// need to cancel this selection key from the selector
sk.cancel();
} catch (Exception e) {
}
}
}
private void makeWritable(SelectionKey sk) {
try {
selector.wakeup();
if (sk.isValid()) {
sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE);
}
} catch (RuntimeException e) {
LOG.error("Problem setting writable", e);
throw e;
}
}
private void sendBuffers(ByteBuffer bb[]) {
ByteBuffer len = ByteBuffer.allocate(4);
int total = 0;
for (int i = 0; i < bb.length; i++) {
if (bb[i] != null) {
total += bb[i].remaining();
}
}
LOG.debug("Sending response of size {} to {}", total, peerName);
len.putInt(total);
len.flip();
outgoingBuffers.add(len);
for (int i = 0; i < bb.length; i++) {
if (bb[i] != null) {
outgoingBuffers.add(bb[i]);
}
}
makeWritable(sk);
}
public void sendResponse(ByteBuffer... bb) {
synchronized (this) {
if (closed) {
return;
}
sendBuffers(bb);
outstandingRequests--;
}
// acquire these monitors in order to avoid deadlock during shutdown
// it doesn't matter much whether we do this synchronusly with sendBuffers, as long as it happens
synchronized (NIOServerFactory.this) {
synchronized (this) {
// check throttling
if (outstandingRequests < outstandingLimit) {
sk.selector().wakeup();
enableRecv();
}
}
}
}
public InetSocketAddress getRemoteAddress() {
return (InetSocketAddress) sock.socket().getRemoteSocketAddress();
}
private class CnxnStats {
long packetsSent = 0;
long packetsReceived = 0;
/**
* The number of requests that have been submitted but not yet
* responded to.
*/
public long getOutstandingRequests() {
synchronized(Cnxn.this) {
return outstandingRequests;
}
}
public long getPacketsReceived() {
return packetsReceived;
}
public long getPacketsSent() {
return packetsSent;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
Channel channel = sk.channel();
if (channel instanceof SocketChannel) {
sb.append(" ").append(((SocketChannel) channel).socket().getRemoteSocketAddress()).append("[")
.append(Integer.toHexString(sk.interestOps())).append("](queued=").append(
getOutstandingRequests()).append(",recved=").append(getPacketsReceived()).append(
",sent=").append(getPacketsSent()).append(")\n");
}
return sb.toString();
}
}
private CnxnStats cnxnStats = new CnxnStats();
public CnxnStats getStats() {
return cnxnStats;
}
}
}