blob: eb423e338f5a36e432f4e932488a65168d4554a9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.network;
import java.io.IOException;
import java.io.EOFException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.CancelledKeyException;
import java.security.Principal;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLEngineResult.HandshakeStatus;
import javax.net.ssl.SSLEngineResult.Status;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLHandshakeException;
import javax.net.ssl.SSLSession;
import javax.net.ssl.SSLPeerUnverifiedException;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/*
* Transport layer for SSL communication
*/
public class SslTransportLayer implements TransportLayer {
private static final Logger log = LoggerFactory.getLogger(SslTransportLayer.class);
private final String channelId;
private final SSLEngine sslEngine;
private final SelectionKey key;
private final SocketChannel socketChannel;
private final boolean enableRenegotiation;
private HandshakeStatus handshakeStatus;
private SSLEngineResult handshakeResult;
private boolean handshakeComplete = false;
private boolean closing = false;
private ByteBuffer netReadBuffer;
private ByteBuffer netWriteBuffer;
private ByteBuffer appReadBuffer;
private ByteBuffer emptyBuf = ByteBuffer.allocate(0);
public static SslTransportLayer create(String channelId, SelectionKey key, SSLEngine sslEngine) throws IOException {
// Disable renegotiation by default until we have fixed the known issues with the existing implementation
SslTransportLayer transportLayer = new SslTransportLayer(channelId, key, sslEngine, false);
transportLayer.startHandshake();
return transportLayer;
}
// Prefer `create`, only use this in tests
SslTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine, boolean enableRenegotiation) throws IOException {
this.channelId = channelId;
this.key = key;
this.socketChannel = (SocketChannel) key.channel();
this.sslEngine = sslEngine;
this.enableRenegotiation = enableRenegotiation;
}
/**
* starts sslEngine handshake process
*/
protected void startHandshake() throws IOException {
this.netReadBuffer = ByteBuffer.allocate(netReadBufferSize());
this.netWriteBuffer = ByteBuffer.allocate(netWriteBufferSize());
this.appReadBuffer = ByteBuffer.allocate(applicationBufferSize());
//clear & set netRead & netWrite buffers
netWriteBuffer.position(0);
netWriteBuffer.limit(0);
netReadBuffer.position(0);
netReadBuffer.limit(0);
handshakeComplete = false;
closing = false;
//initiate handshake
sslEngine.beginHandshake();
handshakeStatus = sslEngine.getHandshakeStatus();
}
@Override
public boolean ready() {
return handshakeComplete;
}
/**
* does socketChannel.finishConnect()
*/
@Override
public boolean finishConnect() throws IOException {
boolean connected = socketChannel.finishConnect();
if (connected)
key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
return connected;
}
/**
* disconnects selectionKey.
*/
@Override
public void disconnect() {
key.cancel();
}
@Override
public SocketChannel socketChannel() {
return socketChannel;
}
@Override
public boolean isOpen() {
return socketChannel.isOpen();
}
@Override
public boolean isConnected() {
return socketChannel.isConnected();
}
/**
* Sends a SSL close message and closes socketChannel.
*/
@Override
public void close() throws IOException {
if (closing) return;
closing = true;
sslEngine.closeOutbound();
try {
if (isConnected()) {
if (!flush(netWriteBuffer)) {
throw new IOException("Remaining data in the network buffer, can't send SSL close message.");
}
//prep the buffer for the close message
netWriteBuffer.clear();
//perform the close, since we called sslEngine.closeOutbound
SSLEngineResult wrapResult = sslEngine.wrap(emptyBuf, netWriteBuffer);
//we should be in a close state
if (wrapResult.getStatus() != SSLEngineResult.Status.CLOSED) {
throw new IOException("Unexpected status returned by SSLEngine.wrap, expected CLOSED, received " +
wrapResult.getStatus() + ". Will not send close message to peer.");
}
netWriteBuffer.flip();
flush(netWriteBuffer);
}
} catch (IOException ie) {
log.warn("Failed to send SSL Close message ", ie);
} finally {
try {
socketChannel.socket().close();
socketChannel.close();
} finally {
key.attach(null);
key.cancel();
}
}
}
/**
* returns true if there are any pending contents in netWriteBuffer
*/
@Override
public boolean hasPendingWrites() {
return netWriteBuffer.hasRemaining();
}
/**
* Flushes the buffer to the network, non blocking
* @param buf ByteBuffer
* @return boolean true if the buffer has been emptied out, false otherwise
* @throws IOException
*/
private boolean flush(ByteBuffer buf) throws IOException {
int remaining = buf.remaining();
if (remaining > 0) {
int written = socketChannel.write(buf);
return written >= remaining;
}
return true;
}
/**
* Performs SSL handshake, non blocking.
* Before application data (kafka protocols) can be sent client & kafka broker must
* perform ssl handshake.
* During the handshake SSLEngine generates encrypted data that will be transported over socketChannel.
* Each SSLEngine operation generates SSLEngineResult , of which SSLEngineResult.handshakeStatus field is used to
* determine what operation needs to occur to move handshake along.
* A typical handshake might look like this.
* +-------------+----------------------------------+-------------+
* | client | SSL/TLS message | HSStatus |
* +-------------+----------------------------------+-------------+
* | wrap() | ClientHello | NEED_UNWRAP |
* | unwrap() | ServerHello/Cert/ServerHelloDone | NEED_WRAP |
* | wrap() | ClientKeyExchange | NEED_WRAP |
* | wrap() | ChangeCipherSpec | NEED_WRAP |
* | wrap() | Finished | NEED_UNWRAP |
* | unwrap() | ChangeCipherSpec | NEED_UNWRAP |
* | unwrap() | Finished | FINISHED |
* +-------------+----------------------------------+-------------+
*
* @throws IOException
*/
@Override
public void handshake() throws IOException {
boolean read = key.isReadable();
boolean write = key.isWritable();
handshakeComplete = false;
handshakeStatus = sslEngine.getHandshakeStatus();
if (!flush(netWriteBuffer)) {
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
return;
}
try {
switch (handshakeStatus) {
case NEED_TASK:
log.trace("SSLHandshake NEED_TASK channelId {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}",
channelId, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position());
handshakeStatus = runDelegatedTasks();
break;
case NEED_WRAP:
log.trace("SSLHandshake NEED_WRAP channelId {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}",
channelId, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position());
handshakeResult = handshakeWrap(write);
if (handshakeResult.getStatus() == Status.BUFFER_OVERFLOW) {
int currentNetWriteBufferSize = netWriteBufferSize();
netWriteBuffer.compact();
netWriteBuffer = Utils.ensureCapacity(netWriteBuffer, currentNetWriteBufferSize);
netWriteBuffer.flip();
if (netWriteBuffer.limit() >= currentNetWriteBufferSize) {
throw new IllegalStateException("Buffer overflow when available data size (" + netWriteBuffer.limit() +
") >= network buffer size (" + currentNetWriteBufferSize + ")");
}
} else if (handshakeResult.getStatus() == Status.BUFFER_UNDERFLOW) {
throw new IllegalStateException("Should not have received BUFFER_UNDERFLOW during handshake WRAP.");
} else if (handshakeResult.getStatus() == Status.CLOSED) {
throw new EOFException();
}
log.trace("SSLHandshake NEED_WRAP channelId {}, handshakeResult {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}",
channelId, handshakeResult, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position());
//if handshake status is not NEED_UNWRAP or unable to flush netWriteBuffer contents
//we will break here otherwise we can do need_unwrap in the same call.
if (handshakeStatus != HandshakeStatus.NEED_UNWRAP || !flush(netWriteBuffer)) {
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
break;
}
case NEED_UNWRAP:
log.trace("SSLHandshake NEED_UNWRAP channelId {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}",
channelId, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position());
do {
handshakeResult = handshakeUnwrap(read);
if (handshakeResult.getStatus() == Status.BUFFER_OVERFLOW) {
int currentAppBufferSize = applicationBufferSize();
appReadBuffer = Utils.ensureCapacity(appReadBuffer, currentAppBufferSize);
if (appReadBuffer.position() > currentAppBufferSize) {
throw new IllegalStateException("Buffer underflow when available data size (" + appReadBuffer.position() +
") > packet buffer size (" + currentAppBufferSize + ")");
}
}
} while (handshakeResult.getStatus() == Status.BUFFER_OVERFLOW);
if (handshakeResult.getStatus() == Status.BUFFER_UNDERFLOW) {
int currentNetReadBufferSize = netReadBufferSize();
netReadBuffer = Utils.ensureCapacity(netReadBuffer, currentNetReadBufferSize);
if (netReadBuffer.position() >= currentNetReadBufferSize) {
throw new IllegalStateException("Buffer underflow when there is available data");
}
} else if (handshakeResult.getStatus() == Status.CLOSED) {
throw new EOFException("SSL handshake status CLOSED during handshake UNWRAP");
}
log.trace("SSLHandshake NEED_UNWRAP channelId {}, handshakeResult {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}",
channelId, handshakeResult, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position());
//if handshakeStatus completed than fall-through to finished status.
//after handshake is finished there is no data left to read/write in socketChannel.
//so the selector won't invoke this channel if we don't go through the handshakeFinished here.
if (handshakeStatus != HandshakeStatus.FINISHED) {
if (handshakeStatus == HandshakeStatus.NEED_WRAP) {
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
} else if (handshakeStatus == HandshakeStatus.NEED_UNWRAP) {
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
}
break;
}
case FINISHED:
handshakeFinished();
break;
case NOT_HANDSHAKING:
handshakeFinished();
break;
default:
throw new IllegalStateException(String.format("Unexpected status [%s]", handshakeStatus));
}
} catch (SSLException e) {
handshakeFailure();
throw e;
}
}
private void renegotiate() throws IOException {
if (!enableRenegotiation)
throw new SSLHandshakeException("Renegotiation is not supported");
handshake();
}
/**
* Executes the SSLEngine tasks needed.
* @return HandshakeStatus
*/
private HandshakeStatus runDelegatedTasks() {
for (;;) {
Runnable task = delegatedTask();
if (task == null) {
break;
}
task.run();
}
return sslEngine.getHandshakeStatus();
}
/**
* Checks if the handshake status is finished
* Sets the interestOps for the selectionKey.
*/
private void handshakeFinished() throws IOException {
// SSLEngine.getHandshakeStatus is transient and it doesn't record FINISHED status properly.
// It can move from FINISHED status to NOT_HANDSHAKING after the handshake is completed.
// Hence we also need to check handshakeResult.getHandshakeStatus() if the handshake finished or not
if (handshakeResult.getHandshakeStatus() == HandshakeStatus.FINISHED) {
//we are complete if we have delivered the last package
handshakeComplete = !netWriteBuffer.hasRemaining();
//remove OP_WRITE if we are complete, otherwise we still have data to write
if (!handshakeComplete)
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
else {
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
SSLSession session = sslEngine.getSession();
log.debug("SSL handshake completed successfully with peerHost '{}' peerPort {} peerPrincipal '{}' cipherSuite '{}'",
session.getPeerHost(), session.getPeerPort(), peerPrincipal(), session.getCipherSuite());
}
log.trace("SSLHandshake FINISHED channelId {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {} ",
channelId, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position());
} else {
throw new IOException("NOT_HANDSHAKING during handshake");
}
}
/**
* Performs the WRAP function
* @param doWrite boolean
* @return SSLEngineResult
* @throws IOException
*/
private SSLEngineResult handshakeWrap(boolean doWrite) throws IOException {
log.trace("SSLHandshake handshakeWrap {}", channelId);
if (netWriteBuffer.hasRemaining())
throw new IllegalStateException("handshakeWrap called with netWriteBuffer not empty");
//this should never be called with a network buffer that contains data
//so we can clear it here.
netWriteBuffer.clear();
SSLEngineResult result = sslEngine.wrap(emptyBuf, netWriteBuffer);
//prepare the results to be written
netWriteBuffer.flip();
handshakeStatus = result.getHandshakeStatus();
if (result.getStatus() == SSLEngineResult.Status.OK &&
result.getHandshakeStatus() == HandshakeStatus.NEED_TASK) {
handshakeStatus = runDelegatedTasks();
}
if (doWrite) flush(netWriteBuffer);
return result;
}
/**
* Perform handshake unwrap
* @param doRead boolean
* @return SSLEngineResult
* @throws IOException
*/
private SSLEngineResult handshakeUnwrap(boolean doRead) throws IOException {
log.trace("SSLHandshake handshakeUnwrap {}", channelId);
SSLEngineResult result;
if (doRead) {
int read = socketChannel.read(netReadBuffer);
if (read == -1) throw new EOFException("EOF during handshake.");
}
boolean cont;
do {
//prepare the buffer with the incoming data
netReadBuffer.flip();
result = sslEngine.unwrap(netReadBuffer, appReadBuffer);
netReadBuffer.compact();
handshakeStatus = result.getHandshakeStatus();
if (result.getStatus() == SSLEngineResult.Status.OK &&
result.getHandshakeStatus() == HandshakeStatus.NEED_TASK) {
handshakeStatus = runDelegatedTasks();
}
cont = result.getStatus() == SSLEngineResult.Status.OK &&
handshakeStatus == HandshakeStatus.NEED_UNWRAP;
log.trace("SSLHandshake handshakeUnwrap: handshakeStatus {} status {}", handshakeStatus, result.getStatus());
} while (netReadBuffer.position() != 0 && cont);
return result;
}
/**
* Reads a sequence of bytes from this channel into the given buffer.
*
* @param dst The buffer into which bytes are to be transferred
* @return The number of bytes read, possible zero or -1 if the channel has reached end-of-stream
* @throws IOException if some other I/O error occurs
*/
@Override
public int read(ByteBuffer dst) throws IOException {
if (closing) return -1;
int read = 0;
if (!handshakeComplete) return read;
//if we have unread decrypted data in appReadBuffer read that into dst buffer.
if (appReadBuffer.position() > 0) {
read = readFromAppBuffer(dst);
}
if (dst.remaining() > 0) {
netReadBuffer = Utils.ensureCapacity(netReadBuffer, netReadBufferSize());
if (netReadBuffer.remaining() > 0) {
int netread = socketChannel.read(netReadBuffer);
if (netread == 0 && netReadBuffer.position() == 0) return read;
else if (netread < 0) throw new EOFException("EOF during read");
}
do {
netReadBuffer.flip();
SSLEngineResult unwrapResult = sslEngine.unwrap(netReadBuffer, appReadBuffer);
netReadBuffer.compact();
// handle ssl renegotiation.
if (unwrapResult.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING && unwrapResult.getStatus() == Status.OK) {
log.trace("SSLChannel Read begin renegotiation channelId {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}",
channelId, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position());
renegotiate();
break;
}
if (unwrapResult.getStatus() == Status.OK) {
read += readFromAppBuffer(dst);
} else if (unwrapResult.getStatus() == Status.BUFFER_OVERFLOW) {
int currentApplicationBufferSize = applicationBufferSize();
appReadBuffer = Utils.ensureCapacity(appReadBuffer, currentApplicationBufferSize);
if (appReadBuffer.position() >= currentApplicationBufferSize) {
throw new IllegalStateException("Buffer overflow when available data size (" + appReadBuffer.position() +
") >= application buffer size (" + currentApplicationBufferSize + ")");
}
// appReadBuffer will extended upto currentApplicationBufferSize
// we need to read the existing content into dst before we can do unwrap again. If there are no space in dst
// we can break here.
if (dst.hasRemaining())
read += readFromAppBuffer(dst);
else
break;
} else if (unwrapResult.getStatus() == Status.BUFFER_UNDERFLOW) {
int currentNetReadBufferSize = netReadBufferSize();
netReadBuffer = Utils.ensureCapacity(netReadBuffer, currentNetReadBufferSize);
if (netReadBuffer.position() >= currentNetReadBufferSize) {
throw new IllegalStateException("Buffer underflow when available data size (" + netReadBuffer.position() +
") > packet buffer size (" + currentNetReadBufferSize + ")");
}
break;
} else if (unwrapResult.getStatus() == Status.CLOSED) {
// If data has been read and unwrapped, return the data. Close will be handled on the next poll.
if (appReadBuffer.position() == 0 && read == 0)
throw new EOFException();
else
break;
}
} while (netReadBuffer.position() != 0);
}
return read;
}
/**
* Reads a sequence of bytes from this channel into the given buffers.
*
* @param dsts - The buffers into which bytes are to be transferred.
* @return The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream.
* @throws IOException if some other I/O error occurs
*/
@Override
public long read(ByteBuffer[] dsts) throws IOException {
return read(dsts, 0, dsts.length);
}
/**
* Reads a sequence of bytes from this channel into a subsequence of the given buffers.
* @param dsts - The buffers into which bytes are to be transferred
* @param offset - The offset within the buffer array of the first buffer into which bytes are to be transferred; must be non-negative and no larger than dsts.length.
* @param length - The maximum number of buffers to be accessed; must be non-negative and no larger than dsts.length - offset
* @return The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream.
* @throws IOException if some other I/O error occurs
*/
@Override
public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
if ((offset < 0) || (length < 0) || (offset > dsts.length - length))
throw new IndexOutOfBoundsException();
int totalRead = 0;
int i = offset;
while (i < length) {
if (dsts[i].hasRemaining()) {
int read = read(dsts[i]);
if (read > 0)
totalRead += read;
else
break;
}
if (!dsts[i].hasRemaining()) {
i++;
}
}
return totalRead;
}
/**
* Writes a sequence of bytes to this channel from the given buffer.
*
* @param src The buffer from which bytes are to be retrieved
* @return The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream
* @throws IOException If some other I/O error occurs
*/
@Override
public int write(ByteBuffer src) throws IOException {
int written = 0;
if (closing) throw new IllegalStateException("Channel is in closing state");
if (!handshakeComplete) return written;
if (!flush(netWriteBuffer))
return written;
netWriteBuffer.clear();
SSLEngineResult wrapResult = sslEngine.wrap(src, netWriteBuffer);
netWriteBuffer.flip();
//handle ssl renegotiation
if (wrapResult.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING && wrapResult.getStatus() == Status.OK) {
renegotiate();
return written;
}
if (wrapResult.getStatus() == Status.OK) {
written = wrapResult.bytesConsumed();
flush(netWriteBuffer);
} else if (wrapResult.getStatus() == Status.BUFFER_OVERFLOW) {
int currentNetWriteBufferSize = netWriteBufferSize();
netWriteBuffer.compact();
netWriteBuffer = Utils.ensureCapacity(netWriteBuffer, currentNetWriteBufferSize);
netWriteBuffer.flip();
if (netWriteBuffer.limit() >= currentNetWriteBufferSize)
throw new IllegalStateException("SSL BUFFER_OVERFLOW when available data size (" + netWriteBuffer.limit() + ") >= network buffer size (" + currentNetWriteBufferSize + ")");
} else if (wrapResult.getStatus() == Status.BUFFER_UNDERFLOW) {
throw new IllegalStateException("SSL BUFFER_UNDERFLOW during write");
} else if (wrapResult.getStatus() == Status.CLOSED) {
throw new EOFException();
}
return written;
}
/**
* Writes a sequence of bytes to this channel from the subsequence of the given buffers.
*
* @param srcs The buffers from which bytes are to be retrieved
* @param offset The offset within the buffer array of the first buffer from which bytes are to be retrieved; must be non-negative and no larger than srcs.length.
* @param length - The maximum number of buffers to be accessed; must be non-negative and no larger than srcs.length - offset.
* @return returns no.of bytes written , possibly zero.
* @throws IOException If some other I/O error occurs
*/
@Override
public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
if ((offset < 0) || (length < 0) || (offset > srcs.length - length))
throw new IndexOutOfBoundsException();
int totalWritten = 0;
int i = offset;
while (i < length) {
if (srcs[i].hasRemaining() || hasPendingWrites()) {
int written = write(srcs[i]);
if (written > 0) {
totalWritten += written;
}
}
if (!srcs[i].hasRemaining() && !hasPendingWrites()) {
i++;
} else {
// if we are unable to write the current buffer to socketChannel we should break,
// as we might have reached max socket send buffer size.
break;
}
}
return totalWritten;
}
/**
* Writes a sequence of bytes to this channel from the given buffers.
*
* @param srcs The buffers from which bytes are to be retrieved
* @return returns no.of bytes consumed by SSLEngine.wrap , possibly zero.
* @throws IOException If some other I/O error occurs
*/
@Override
public long write(ByteBuffer[] srcs) throws IOException {
return write(srcs, 0, srcs.length);
}
/**
* SSLSession's peerPrincipal for the remote host.
* @return Principal
*/
public Principal peerPrincipal() throws IOException {
try {
return sslEngine.getSession().getPeerPrincipal();
} catch (SSLPeerUnverifiedException se) {
log.debug("SSL peer is not authenticated, returning ANONYMOUS instead");
return KafkaPrincipal.ANONYMOUS;
}
}
/**
* returns a SSL Session after the handshake is established
* throws IllegalStateException if the handshake is not established
*/
public SSLSession sslSession() throws IllegalStateException {
return sslEngine.getSession();
}
/**
* Adds interestOps to SelectionKey of the TransportLayer
* @param ops SelectionKey interestOps
*/
@Override
public void addInterestOps(int ops) {
if (!key.isValid())
throw new CancelledKeyException();
else if (!handshakeComplete)
throw new IllegalStateException("handshake is not completed");
key.interestOps(key.interestOps() | ops);
}
/**
* removes interestOps to SelectionKey of the TransportLayer
* @param ops SelectionKey interestOps
*/
@Override
public void removeInterestOps(int ops) {
if (!key.isValid())
throw new CancelledKeyException();
else if (!handshakeComplete)
throw new IllegalStateException("handshake is not completed");
key.interestOps(key.interestOps() & ~ops);
}
/**
* returns delegatedTask for the SSLEngine.
*/
protected Runnable delegatedTask() {
return sslEngine.getDelegatedTask();
}
/**
* transfers appReadBuffer contents (decrypted data) into dst bytebuffer
* @param dst ByteBuffer
*/
private int readFromAppBuffer(ByteBuffer dst) {
appReadBuffer.flip();
int remaining = Math.min(appReadBuffer.remaining(), dst.remaining());
if (remaining > 0) {
int limit = appReadBuffer.limit();
appReadBuffer.limit(appReadBuffer.position() + remaining);
dst.put(appReadBuffer);
appReadBuffer.limit(limit);
}
appReadBuffer.compact();
return remaining;
}
protected int netReadBufferSize() {
return sslEngine.getSession().getPacketBufferSize();
}
protected int netWriteBufferSize() {
return sslEngine.getSession().getPacketBufferSize();
}
protected int applicationBufferSize() {
return sslEngine.getSession().getApplicationBufferSize();
}
protected ByteBuffer netReadBuffer() {
return netReadBuffer;
}
private void handshakeFailure() {
//Release all resources such as internal buffers that SSLEngine is managing
sslEngine.closeOutbound();
try {
sslEngine.closeInbound();
} catch (SSLException e) {
log.debug("SSLEngine.closeInBound() raised an exception.", e);
}
}
@Override
public boolean isMute() {
return key.isValid() && (key.interestOps() & SelectionKey.OP_READ) == 0;
}
@Override
public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException {
return fileChannel.transferTo(position, count, this);
}
}