blob: a8c8eb5ca4fc42f2965c83a81f635027a1414396 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.transport.network.security.ssl;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLEngineResult.HandshakeStatus;
import javax.net.ssl.SSLEngineResult.Status;
import javax.net.ssl.SSLException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.SenderException;
import org.apache.qpid.transport.network.security.SSLStatus;
import org.apache.qpid.util.ByteBufferUtils;
public class SSLSender implements ByteBufferSender
{
private static final Logger LOGGER = LoggerFactory.getLogger(SSLSender.class);
private final ByteBufferSender delegate;
private final SSLEngine engine;
private final int sslBufSize;
private final ByteBuffer netData;
private final long timeout;
private final SSLStatus _sslStatus;
private String _hostname;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final ConcurrentLinkedQueue<ByteBuffer> _pending = new ConcurrentLinkedQueue<>();
public SSLSender(SSLEngine engine, ByteBufferSender delegate, SSLStatus sslStatus)
{
this.engine = engine;
this.delegate = delegate;
sslBufSize = engine.getSession().getPacketBufferSize();
netData = ByteBuffer.allocate(sslBufSize);
timeout = Long.getLong("qpid.ssl_timeout", 60000);
_sslStatus = sslStatus;
}
public void setHostname(String hostname)
{
_hostname = hostname;
}
public void close()
{
if (!closed.getAndSet(true))
{
if (engine.isOutboundDone())
{
return;
}
LOGGER.debug("Closing SSL connection");
engine.closeOutbound();
try
{
tearDownSSLConnection();
}
catch(Exception e)
{
throw new SenderException("Error closing SSL connection",e);
}
synchronized(_sslStatus.getSslLock())
{
while (!engine.isOutboundDone())
{
try
{
_sslStatus.getSslLock().wait();
}
catch(InterruptedException e)
{
// pass
}
}
}
delegate.close();
}
}
private void tearDownSSLConnection() throws Exception
{
SSLEngineResult result = ByteBufferUtils.encryptSSL(engine,
Collections.singletonList(ByteBuffer.allocate(0)),
netData);
Status status = result.getStatus();
int read = result.bytesProduced();
while (status != Status.CLOSED)
{
if (status == Status.BUFFER_OVERFLOW)
{
netData.clear();
}
if(read > 0)
{
int limit = netData.limit();
netData.limit(netData.position());
netData.position(netData.position() - read);
ByteBuffer data = netData.slice();
netData.limit(limit);
netData.position(netData.position() + read);
delegate.send(data);
flush();
}
result = ByteBufferUtils.encryptSSL(engine,
Collections.singletonList(ByteBuffer.allocate(0)),
netData);
status = result.getStatus();
read = result.bytesProduced();
}
}
public void flush()
{
doSend();
delegate.flush();
}
public void send(ByteBuffer appData)
{
_pending.add(appData.duplicate());
}
public void doSend()
{
if (closed.get() && !_sslStatus.getSslErrorFlag())
{
throw new SenderException("SSL Sender is closed");
}
HandshakeStatus handshakeStatus;
Status status;
while(!_pending.isEmpty() && !_sslStatus.getSslErrorFlag())
{
int read = 0;
try
{
SSLEngineResult result = ByteBufferUtils.encryptSSL(engine, _pending, netData);
while(!_pending.isEmpty())
{
ByteBuffer buf = _pending.peek();
if (buf.hasRemaining())
{
break;
}
_pending.poll();
}
read = result.bytesProduced();
status = result.getStatus();
handshakeStatus = result.getHandshakeStatus();
}
catch(SSLException e)
{
// Should this set _sslError??
throw new SenderException("SSL, Error occurred while encrypting data",e);
}
if(read > 0)
{
int limit = netData.limit();
netData.limit(netData.position());
netData.position(netData.position() - read);
ByteBuffer data = netData.slice();
netData.limit(limit);
netData.position(netData.position() + read);
delegate.send(data);
}
switch(status)
{
case CLOSED:
throw new SenderException("SSLEngine is closed");
case BUFFER_OVERFLOW:
netData.clear();
continue;
case OK:
break; // do nothing
default:
throw new IllegalStateException("SSLReceiver: Invalid State " + status);
}
switch (handshakeStatus)
{
case NEED_WRAP:
if (netData.hasRemaining())
{
continue;
}
break;
case NEED_TASK:
doTasks();
break;
case NEED_UNWRAP:
delegate.flush();
synchronized(_sslStatus.getSslLock())
{
if (_sslStatus.getSslErrorFlag())
{
break;
}
switch (engine.getHandshakeStatus())
{
case NEED_UNWRAP:
final long start = System.currentTimeMillis();
try
{
_sslStatus.getSslLock().wait(timeout);
}
catch(InterruptedException e)
{
// pass
}
if (!_sslStatus.getSslErrorFlag() && System.currentTimeMillis() - start >= timeout)
{
throw new SenderException(
"SSL Engine timed out after waiting " + timeout + "ms. for a response." +
"To get more info,run with -Djavax.net.debug=ssl");
}
break;
}
}
break;
case FINISHED:
if (_hostname != null)
{
SSLUtil.verifyHostname(engine, _hostname);
}
break;
case NOT_HANDSHAKING:
break; //do nothing
default:
throw new IllegalStateException("SSLSender: Invalid State " + status);
}
}
}
private void doTasks()
{
Runnable runnable;
while ((runnable = engine.getDelegatedTask()) != null) {
runnable.run();
}
}
}