blob: 83e7491e51b5de9574bb464ab312e58264109c30 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper.server;
import static java.nio.charset.StandardCharsets.UTF_8;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.Writer;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.security.cert.Certificate;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.jute.BinaryInputArchive;
import org.apache.jute.Record;
import org.apache.zookeeper.ClientCnxn;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.proto.ConnectRequest;
import org.apache.zookeeper.proto.ReplyHeader;
import org.apache.zookeeper.proto.WatcherEvent;
import org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread;
import org.apache.zookeeper.server.command.CommandExecutor;
import org.apache.zookeeper.server.command.FourLetterCommands;
import org.apache.zookeeper.server.command.NopCommand;
import org.apache.zookeeper.server.command.SetTraceMaskCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class handles communication with clients using NIO. There is one per
* client, but only one thread doing the communication.
*/
public class NIOServerCnxn extends ServerCnxn {
private static final Logger LOG = LoggerFactory.getLogger(NIOServerCnxn.class);
private final NIOServerCnxnFactory factory;
private final SocketChannel sock;
private final SelectorThread selectorThread;
private final SelectionKey sk;
private boolean initialized;
private final ByteBuffer lenBuffer = ByteBuffer.allocate(4);
protected ByteBuffer incomingBuffer = lenBuffer;
private final Queue<ByteBuffer> outgoingBuffers = new LinkedBlockingQueue<ByteBuffer>();
private int sessionTimeout;
/**
* This is the id that uniquely identifies the session of a client. Once
* this session is no longer active, the ephemeral nodes will go away.
*/
private long sessionId;
/**
* Client socket option for TCP keepalive
*/
private final boolean clientTcpKeepAlive = Boolean.getBoolean("zookeeper.clientTcpKeepAlive");
public NIOServerCnxn(ZooKeeperServer zk, SocketChannel sock, SelectionKey sk, NIOServerCnxnFactory factory, SelectorThread selectorThread) throws IOException {
super(zk);
this.sock = sock;
this.sk = sk;
this.factory = factory;
this.selectorThread = selectorThread;
if (this.factory.login != null) {
this.zooKeeperSaslServer = new ZooKeeperSaslServer(factory.login);
}
sock.socket().setTcpNoDelay(true);
/* set socket linger to false, so that socket close does not block */
sock.socket().setSoLinger(false, -1);
sock.socket().setKeepAlive(clientTcpKeepAlive);
InetAddress addr = ((InetSocketAddress) sock.socket().getRemoteSocketAddress()).getAddress();
addAuthInfo(new Id("ip", addr.getHostAddress()));
this.sessionTimeout = factory.sessionlessCnxnTimeout;
}
/* Send close connection packet to the client, doIO will eventually
* close the underlying machinery (like socket, selectorkey, etc...)
*/
public void sendCloseSession() {
sendBuffer(ServerCnxnFactory.closeConn);
}
/**
* send buffer without using the asynchronous
* calls to selector and then close the socket
* @param bb
*/
void sendBufferSync(ByteBuffer bb) {
try {
/* configure socket to be blocking
* so that we dont have to do write in
* a tight while loop
*/
if (bb != ServerCnxnFactory.closeConn) {
if (sock.isOpen()) {
sock.configureBlocking(true);
sock.write(bb);
}
packetSent();
}
} catch (IOException ie) {
LOG.error("Error sending data synchronously ", ie);
}
}
/**
* sendBuffer pushes a byte buffer onto the outgoing buffer queue for
* asynchronous writes.
*/
public void sendBuffer(ByteBuffer... buffers) {
if (LOG.isTraceEnabled()) {
LOG.trace("Add a buffer to outgoingBuffers, sk {} is valid: {}", sk, sk.isValid());
}
synchronized (outgoingBuffers) {
for (ByteBuffer buffer : buffers) {
outgoingBuffers.add(buffer);
}
outgoingBuffers.add(packetSentinel);
}
requestInterestOpsUpdate();
}
/**
* When read on socket failed, this is typically because client closed the
* connection. In most cases, the client does this when the server doesn't
* respond within 2/3 of session timeout. This possibly indicates server
* health/performance issue, so we need to log and keep track of stat
*
* @throws EndOfStreamException
*/
private void handleFailedRead() throws EndOfStreamException {
setStale();
ServerMetrics.getMetrics().CONNECTION_DROP_COUNT.add(1);
throw new EndOfStreamException("Unable to read additional data from client,"
+ " it probably closed the socket:"
+ " address = " + sock.socket().getRemoteSocketAddress() + ","
+ " session = 0x" + Long.toHexString(sessionId),
DisconnectReason.UNABLE_TO_READ_FROM_CLIENT);
}
/** Read the request payload (everything following the length prefix) */
private void readPayload() throws IOException, InterruptedException, ClientCnxnLimitException {
if (incomingBuffer.remaining() != 0) { // have we read length bytes?
int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
if (rc < 0) {
handleFailedRead();
}
}
if (incomingBuffer.remaining() == 0) { // have we read length bytes?
incomingBuffer.flip();
packetReceived(4 + incomingBuffer.remaining());
if (!initialized) {
readConnectRequest();
} else {
readRequest();
}
lenBuffer.clear();
incomingBuffer = lenBuffer;
}
}
/**
* This boolean tracks whether the connection is ready for selection or
* not. A connection is marked as not ready for selection while it is
* processing an IO request. The flag is used to gatekeep pushing interest
* op updates onto the selector.
*/
private final AtomicBoolean selectable = new AtomicBoolean(true);
public boolean isSelectable() {
return sk.isValid() && selectable.get();
}
public void disableSelectable() {
selectable.set(false);
}
public void enableSelectable() {
selectable.set(true);
}
private void requestInterestOpsUpdate() {
if (isSelectable()) {
selectorThread.addInterestOpsUpdateRequest(sk);
}
}
void handleWrite(SelectionKey k) throws IOException {
if (outgoingBuffers.isEmpty()) {
return;
}
/*
* This is going to reset the buffer position to 0 and the
* limit to the size of the buffer, so that we can fill it
* with data from the non-direct buffers that we need to
* send.
*/
ByteBuffer directBuffer = NIOServerCnxnFactory.getDirectBuffer();
if (directBuffer == null) {
ByteBuffer[] bufferList = new ByteBuffer[outgoingBuffers.size()];
// Use gathered write call. This updates the positions of the
// byte buffers to reflect the bytes that were written out.
sock.write(outgoingBuffers.toArray(bufferList));
// Remove the buffers that we have sent
ByteBuffer bb;
while ((bb = outgoingBuffers.peek()) != null) {
if (bb == ServerCnxnFactory.closeConn) {
throw new CloseRequestException("close requested", DisconnectReason.CLIENT_CLOSED_CONNECTION);
}
if (bb == packetSentinel) {
packetSent();
}
if (bb.remaining() > 0) {
break;
}
outgoingBuffers.remove();
}
} else {
directBuffer.clear();
for (ByteBuffer b : outgoingBuffers) {
if (directBuffer.remaining() < b.remaining()) {
/*
* When we call put later, if the directBuffer is to
* small to hold everything, nothing will be copied,
* so we've got to slice the buffer if it's too big.
*/
b = (ByteBuffer) b.slice().limit(directBuffer.remaining());
}
/*
* put() is going to modify the positions of both
* buffers, put we don't want to change the position of
* the source buffers (we'll do that after the send, if
* needed), so we save and reset the position after the
* copy
*/
int p = b.position();
directBuffer.put(b);
b.position(p);
if (directBuffer.remaining() == 0) {
break;
}
}
/*
* Do the flip: limit becomes position, position gets set to
* 0. This sets us up for the write.
*/
directBuffer.flip();
int sent = sock.write(directBuffer);
ByteBuffer bb;
// Remove the buffers that we have sent
while ((bb = outgoingBuffers.peek()) != null) {
if (bb == ServerCnxnFactory.closeConn) {
throw new CloseRequestException("close requested", DisconnectReason.CLIENT_CLOSED_CONNECTION);
}
if (bb == packetSentinel) {
packetSent();
}
if (sent < bb.remaining()) {
/*
* We only partially sent this buffer, so we update
* the position and exit the loop.
*/
bb.position(bb.position() + sent);
break;
}
/* We've sent the whole buffer, so drop the buffer */
sent -= bb.remaining();
outgoingBuffers.remove();
}
}
}
/**
* Only used in order to allow testing
*/
protected boolean isSocketOpen() {
return sock.isOpen();
}
/**
* Handles read/write IO on connection.
*/
void doIO(SelectionKey k) throws InterruptedException {
try {
if (!isSocketOpen()) {
LOG.warn("trying to do i/o on a null socket for session: 0x{}", Long.toHexString(sessionId));
return;
}
if (k.isReadable()) {
int rc = sock.read(incomingBuffer);
if (rc < 0) {
try {
handleFailedRead();
} catch (EndOfStreamException e) {
// no stacktrace. this case is very common, and it is usually not a problem.
LOG.info("{}", e.getMessage());
// expecting close to log session closure
close(e.getReason());
return;
}
}
if (incomingBuffer.remaining() == 0) {
boolean isPayload;
if (incomingBuffer == lenBuffer) { // start of next request
incomingBuffer.flip();
isPayload = readLength(k);
incomingBuffer.clear();
} else {
// continuation
isPayload = true;
}
if (isPayload) { // not the case for 4letterword
readPayload();
} else {
// four letter words take care
// need not do anything else
return;
}
}
}
if (k.isWritable()) {
handleWrite(k);
if (!initialized && !getReadInterest() && !getWriteInterest()) {
throw new CloseRequestException("responded to info probe", DisconnectReason.INFO_PROBE);
}
}
} catch (CancelledKeyException e) {
LOG.warn("CancelledKeyException causing close of session: 0x{}", Long.toHexString(sessionId));
LOG.debug("CancelledKeyException stack trace", e);
close(DisconnectReason.CANCELLED_KEY_EXCEPTION);
} catch (CloseRequestException e) {
// expecting close to log session closure
close();
} catch (EndOfStreamException e) {
LOG.warn("Unexpected exception", e);
// expecting close to log session closure
close(e.getReason());
} catch (ClientCnxnLimitException e) {
// Common case exception, print at debug level
ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);
LOG.warn("Closing session 0x{}", Long.toHexString(sessionId), e);
close(DisconnectReason.CLIENT_CNX_LIMIT);
} catch (IOException e) {
LOG.warn("Close of session 0x{}", Long.toHexString(sessionId), e);
close(DisconnectReason.IO_EXCEPTION);
}
}
protected void readRequest() throws IOException {
zkServer.processPacket(this, incomingBuffer);
}
// returns whether we are interested in writing, which is determined
// by whether we have any pending buffers on the output queue or not
private boolean getWriteInterest() {
return !outgoingBuffers.isEmpty();
}
// returns whether we are interested in taking new requests, which is
// determined by whether we are currently throttled or not
private boolean getReadInterest() {
return !throttled.get();
}
private final AtomicBoolean throttled = new AtomicBoolean(false);
// Throttle acceptance of new requests. If this entailed a state change,
// register an interest op update request with the selector.
//
// Don't support wait disable receive in NIO, ignore the parameter
public void disableRecv(boolean waitDisableRecv) {
if (throttled.compareAndSet(false, true)) {
requestInterestOpsUpdate();
}
}
// Disable throttling and resume acceptance of new requests. If this
// entailed a state change, register an interest op update request with
// the selector.
public void enableRecv() {
if (throttled.compareAndSet(true, false)) {
requestInterestOpsUpdate();
}
}
private void readConnectRequest() throws IOException, ClientCnxnLimitException {
if (!isZKServerRunning()) {
throw new IOException("ZooKeeperServer not running");
}
BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
ConnectRequest request = protocolManager.deserializeConnectRequest(bia);
zkServer.processConnectRequest(this, request);
initialized = true;
}
/**
* This class wraps the sendBuffer method of NIOServerCnxn. It is
* responsible for chunking up the response to a client. Rather
* than cons'ing up a response fully in memory, which may be large
* for some commands, this class chunks up the result.
*/
private class SendBufferWriter extends Writer {
private StringBuffer sb = new StringBuffer();
/**
* Check if we are ready to send another chunk.
* @param force force sending, even if not a full chunk
*/
private void checkFlush(boolean force) {
if ((force && sb.length() > 0) || sb.length() > 2048) {
sendBufferSync(ByteBuffer.wrap(sb.toString().getBytes(UTF_8)));
// clear our internal buffer
sb.setLength(0);
}
}
@Override
public void close() throws IOException {
if (sb == null) {
return;
}
checkFlush(true);
sb = null; // clear out the ref to ensure no reuse
}
@Override
public void flush() throws IOException {
checkFlush(true);
}
@Override
public void write(char[] cbuf, int off, int len) throws IOException {
sb.append(cbuf, off, len);
checkFlush(false);
}
}
/** Return if four letter word found and responded to, otw false **/
private boolean checkFourLetterWord(final SelectionKey k, final int len) throws IOException {
// We take advantage of the limited size of the length to look
// for cmds. They are all 4-bytes which fits inside of an int
if (!FourLetterCommands.isKnown(len)) {
return false;
}
String cmd = FourLetterCommands.getCommandString(len);
packetReceived(4);
/** cancel the selection key to remove the socket handling
* from selector. This is to prevent netcat problem wherein
* netcat immediately closes the sending side after sending the
* commands and still keeps the receiving channel open.
* The idea is to remove the selectionkey from the selector
* so that the selector does not notice the closed read on the
* socket channel and keep the socket alive to write the data to
* and makes sure to close the socket after its done writing the data
*/
if (k != null) {
try {
k.cancel();
} catch (Exception e) {
LOG.error("Error cancelling command selection key", e);
}
}
final PrintWriter pwriter = new PrintWriter(new BufferedWriter(new SendBufferWriter()));
// ZOOKEEPER-2693: don't execute 4lw if it's not enabled.
if (!FourLetterCommands.isEnabled(cmd)) {
LOG.debug("Command {} is not executed because it is not in the whitelist.", cmd);
NopCommand nopCmd = new NopCommand(
pwriter,
this,
cmd + " is not executed because it is not in the whitelist.");
nopCmd.start();
return true;
}
LOG.info("Processing {} command from {}", cmd, sock.socket().getRemoteSocketAddress());
if (len == FourLetterCommands.setTraceMaskCmd) {
incomingBuffer = ByteBuffer.allocate(8);
int rc = sock.read(incomingBuffer);
if (rc < 0) {
throw new IOException("Read error");
}
incomingBuffer.flip();
long traceMask = incomingBuffer.getLong();
ZooTrace.setTextTraceLevel(traceMask);
SetTraceMaskCommand setMask = new SetTraceMaskCommand(pwriter, this, traceMask);
setMask.start();
return true;
} else {
CommandExecutor commandExecutor = new CommandExecutor();
return commandExecutor.execute(this, pwriter, len, zkServer, factory);
}
}
/** Reads the first 4 bytes of lenBuffer, which could be true length or
* four letter word.
*
* @param k selection key
* @return true if length read, otw false (wasn't really the length)
* @throws IOException if buffer size exceeds maxBuffer size
*/
private boolean readLength(SelectionKey k) throws IOException {
// Read the length, now get the buffer
int len = lenBuffer.getInt();
if (!initialized && checkFourLetterWord(sk, len)) {
return false;
}
if (len < 0 || len > BinaryInputArchive.maxBuffer) {
throw new IOException("Len error. "
+ "A message from " + this.getRemoteSocketAddress() + " with advertised length of " + len
+ " is either a malformed message or too large to process"
+ " (length is greater than jute.maxbuffer=" + BinaryInputArchive.maxBuffer + ")");
}
if (!isZKServerRunning()) {
throw new IOException("ZooKeeperServer not running");
}
// checkRequestSize will throw IOException if request is rejected
zkServer.checkRequestSizeWhenReceivingMessage(len);
incomingBuffer = ByteBuffer.allocate(len);
return true;
}
/*
* (non-Javadoc)
*
* @see org.apache.zookeeper.server.ServerCnxnIface#getSessionTimeout()
*/
public int getSessionTimeout() {
return sessionTimeout;
}
/**
* Used by "dump" 4-letter command to list all connection in
* cnxnExpiryMap
*/
@Override
public String toString() {
return "ip: " + sock.socket().getRemoteSocketAddress() + " sessionId: 0x" + Long.toHexString(sessionId);
}
/**
* Close the cnxn and remove it from the factory cnxns list.
*/
@Override
public void close(DisconnectReason reason) {
disconnectReason = reason;
close();
}
private void close() {
setStale();
if (!factory.removeCnxn(this)) {
return;
}
if (zkServer != null) {
zkServer.removeCnxn(this);
}
if (sk != null) {
try {
// need to cancel this selection key from the selector
sk.cancel();
} catch (Exception e) {
LOG.debug("ignoring exception during selectionkey cancel", e);
}
}
closeSock();
}
/**
* Close resources associated with the sock of this cnxn.
*/
private void closeSock() {
if (!sock.isOpen()) {
return;
}
String logMsg = String.format(
"Closed socket connection for client %s %s",
sock.socket().getRemoteSocketAddress(),
sessionId != 0
? "which had sessionid 0x" + Long.toHexString(sessionId)
: "(no session established for client)"
);
LOG.debug(logMsg);
closeSock(sock);
}
/**
* Close resources associated with a sock.
*/
public static void closeSock(SocketChannel sock) {
if (!sock.isOpen()) {
return;
}
try {
/*
* The following sequence of code is stupid! You would think that
* only sock.close() is needed, but alas, it doesn't work that way.
* If you just do sock.close() there are cases where the socket
* doesn't actually close...
*/
sock.socket().shutdownOutput();
} catch (IOException e) {
// This is a relatively common exception that we can't avoid
LOG.debug("ignoring exception during output shutdown", e);
}
try {
sock.socket().shutdownInput();
} catch (IOException e) {
// This is a relatively common exception that we can't avoid
LOG.debug("ignoring exception during input shutdown", e);
}
try {
sock.socket().close();
} catch (IOException e) {
LOG.debug("ignoring exception during socket close", e);
}
try {
sock.close();
} catch (IOException e) {
LOG.debug("ignoring exception during socketchannel close", e);
}
}
private static final ByteBuffer packetSentinel = ByteBuffer.allocate(0);
@Override
public int sendResponse(ReplyHeader h, Record r, String tag, String cacheKey, Stat stat, int opCode) {
int responseSize = 0;
try {
ByteBuffer[] bb = serialize(h, r, tag, cacheKey, stat, opCode);
responseSize = bb[0].getInt();
bb[0].rewind();
sendBuffer(bb);
decrOutstandingAndCheckThrottle(h);
} catch (Exception e) {
LOG.warn("Unexpected exception. Destruction averted.", e);
}
return responseSize;
}
/*
* (non-Javadoc)
*
* @see org.apache.zookeeper.server.ServerCnxnIface#process(org.apache.zookeeper.proto.WatcherEvent)
*/
@Override
public void process(WatchedEvent event) {
ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, -1L, 0);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(
LOG,
ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"Deliver event " + event + " to 0x" + Long.toHexString(this.sessionId) + " through " + this);
}
// Convert WatchedEvent to a type that can be sent over the wire
WatcherEvent e = event.getWrapper();
// The last parameter OpCode here is used to select the response cache.
// Passing OpCode.error (with a value of -1) means we don't care, as we don't need
// response cache on delivering watcher events.
int responseSize = sendResponse(h, e, "notification", null, null, ZooDefs.OpCode.error);
ServerMetrics.getMetrics().WATCH_BYTES.add(responseSize);
}
/*
* (non-Javadoc)
*
* @see org.apache.zookeeper.server.ServerCnxnIface#getSessionId()
*/
@Override
public long getSessionId() {
return sessionId;
}
@Override
public void setSessionId(long sessionId) {
this.sessionId = sessionId;
factory.addSession(sessionId, this);
}
@Override
public void setSessionTimeout(int sessionTimeout) {
this.sessionTimeout = sessionTimeout;
factory.touchCnxn(this);
}
@Override
public int getInterestOps() {
if (!isSelectable()) {
return 0;
}
int interestOps = 0;
if (getReadInterest()) {
interestOps |= SelectionKey.OP_READ;
}
if (getWriteInterest()) {
interestOps |= SelectionKey.OP_WRITE;
}
return interestOps;
}
@Override
public InetSocketAddress getRemoteSocketAddress() {
if (!sock.isOpen()) {
return null;
}
return (InetSocketAddress) sock.socket().getRemoteSocketAddress();
}
public InetAddress getSocketAddress() {
if (!sock.isOpen()) {
return null;
}
return sock.socket().getInetAddress();
}
@Override
protected ServerStats serverStats() {
if (zkServer == null) {
return null;
}
return zkServer.serverStats();
}
@Override
public boolean isSecure() {
return false;
}
@Override
public Certificate[] getClientCertificateChain() {
throw new UnsupportedOperationException("SSL is unsupported in NIOServerCnxn");
}
@Override
public void setClientCertificateChain(Certificate[] chain) {
throw new UnsupportedOperationException("SSL is unsupported in NIOServerCnxn");
}
}