Merge from trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1655057 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index 2e7f3ee..6587bc7 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -64,12 +64,12 @@
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
-import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.StateChangeListener;
import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
import org.apache.qpid.server.virtualhost.RequiredExchangeException;
import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
public abstract class AbstractExchange<T extends AbstractExchange<T>>
extends AbstractConfiguredObject<T>
@@ -510,7 +510,7 @@
{
if (_virtualHost.getState() != State.ACTIVE)
{
- throw new ConnectionScopedRuntimeException("Virtualhost state " + _virtualHost.getState() + " prevents the message from being sent");
+ throw new VirtualHostUnavailableException(this._virtualHost);
}
List<? extends BaseQueue> queues = route(message, routingAddress, instanceProperties);
@@ -910,4 +910,5 @@
this.getVirtualHost().getDurableConfigurationStore().update(false, asObjectRecord());
}
}
+
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java
deleted file mode 100644
index be3a13d..0000000
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-package org.apache.qpid.server.flow;
-
-
-import java.util.concurrent.atomic.AtomicLong;
-
-public class BytesOnlyCreditManager extends AbstractFlowCreditManager
-{
- private final AtomicLong _bytesCredit;
-
- public BytesOnlyCreditManager(long initialCredit)
- {
- _bytesCredit = new AtomicLong(initialCredit);
- }
-
- public long getMessageCredit()
- {
- return -1L;
- }
-
- public long getBytesCredit()
- {
- return _bytesCredit.get();
- }
-
- public void restoreCredit(long messageCredit, long bytesCredit)
- {
- _bytesCredit.addAndGet(bytesCredit);
- setSuspended(false);
- }
-
- public void removeAllCredit()
- {
- _bytesCredit.set(0L);
- }
-
- public boolean hasCredit()
- {
- return _bytesCredit.get() > 0L;
- }
-
- public boolean useCreditForMessage(long msgSize)
- {
- if(hasCredit())
- {
- if(_bytesCredit.addAndGet(-msgSize) >= 0)
- {
- return true;
- }
- else
- {
- _bytesCredit.addAndGet(msgSize);
- setSuspended(true);
- return false;
- }
- }
- else
- {
- return false;
- }
-
- }
-
- public void setBytesCredit(long bytesCredit)
- {
- _bytesCredit.set( bytesCredit );
- }
-}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
index 280f285..08aac0b 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
@@ -24,10 +24,6 @@
public interface FlowCreditManager
{
- long getMessageCredit();
-
- long getBytesCredit();
-
public static interface FlowCreditManagerListener
{
void creditStateChanged(boolean hasCredit);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java
deleted file mode 100644
index 31c1fda..0000000
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-package org.apache.qpid.server.flow;
-
-
-public class MessageAndBytesCreditManager extends AbstractFlowCreditManager implements FlowCreditManager
-{
- private long _messageCredit;
- private long _bytesCredit;
-
- public MessageAndBytesCreditManager(final long messageCredit, final long bytesCredit)
- {
- _messageCredit = messageCredit;
- _bytesCredit = bytesCredit;
- }
-
- public synchronized long getMessageCredit()
- {
- return _messageCredit;
- }
-
- public synchronized long getBytesCredit()
- {
- return _bytesCredit;
- }
-
- public synchronized void restoreCredit(long messageCredit, long bytesCredit)
- {
- _messageCredit += messageCredit;
- _bytesCredit += bytesCredit;
- setSuspended(hasCredit());
- }
-
- public synchronized void removeAllCredit()
- {
- _messageCredit = 0L;
- _bytesCredit = 0L;
- setSuspended(true);
- }
-
- public synchronized boolean hasCredit()
- {
- return (_messageCredit > 0L) && ( _bytesCredit > 0L );
- }
-
- public synchronized boolean useCreditForMessage(final long msgSize)
- {
- if(_messageCredit == 0L)
- {
- setSuspended(true);
- return false;
- }
- else
- {
- if(msgSize > _bytesCredit)
- {
- setSuspended(true);
- return false;
- }
- _messageCredit--;
- _bytesCredit -= msgSize;
- setSuspended(false);
- return true;
- }
-
- }
-
- public synchronized void setBytesCredit(long bytesCredit)
- {
- _bytesCredit = bytesCredit;
- }
-}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
index f13af47..40aa1bb 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
@@ -113,4 +113,6 @@
* @return the time of the last activity or 0 if not in a transaction
*/
long getTransactionUpdateTime();
+
+ void transportStateChanged();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
index dd5e01e..3c25e09 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
@@ -24,12 +24,8 @@
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
-import java.security.Principal;
import java.util.Set;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLPeerUnverifiedException;
import javax.security.auth.Subject;
import org.apache.log4j.Logger;
@@ -43,21 +39,14 @@
import org.apache.qpid.server.plugin.ProtocolEngineCreator;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.NetworkConnection;
-import org.apache.qpid.transport.network.security.SSLStatus;
-import org.apache.qpid.transport.network.security.ssl.SSLBufferingSender;
-import org.apache.qpid.transport.network.security.ssl.SSLReceiver;
-import org.apache.qpid.transport.network.security.ssl.SSLUtil;
public class MultiVersionProtocolEngine implements ServerProtocolEngine
{
private static final Logger _logger = Logger.getLogger(MultiVersionProtocolEngine.class);
private final long _id;
- private final SSLContext _sslContext;
- private final boolean _wantClientAuth;
- private final boolean _needClientAuth;
private final AmqpPort<?> _port;
- private final Transport _transport;
+ private Transport _transport;
private final ProtocolEngineCreator[] _creators;
private final Runnable _onCloseTask;
@@ -71,9 +60,6 @@
private volatile ServerProtocolEngine _delegate = new SelfDelegateProtocolEngine();
public MultiVersionProtocolEngine(final Broker<?> broker,
- SSLContext sslContext,
- boolean wantClientAuth,
- boolean needClientAuth,
final Set<Protocol> supported,
final Protocol defaultSupportedReply,
AmqpPort<?> port,
@@ -92,16 +78,12 @@
_broker = broker;
_supported = supported;
_defaultSupportedReply = defaultSupportedReply;
- _sslContext = sslContext;
- _wantClientAuth = wantClientAuth;
- _needClientAuth = needClientAuth;
_port = port;
_transport = transport;
_creators = creators;
_onCloseTask = onCloseTask;
}
-
public SocketAddress getRemoteAddress()
{
return _delegate.getRemoteAddress();
@@ -147,6 +129,12 @@
_delegate.readerIdle();
}
+ @Override
+ public void encryptedTransport()
+ {
+ _delegate.encryptedTransport();
+ }
+
public void received(ByteBuffer msg)
{
@@ -169,6 +157,18 @@
return _delegate.getSubject();
}
+ @Override
+ public boolean isTransportBlockedForWriting()
+ {
+ return _delegate.isTransportBlockedForWriting();
+ }
+
+ @Override
+ public void setTransportBlockedForWriting(final boolean blocked)
+ {
+ _delegate.setTransportBlockedForWriting(blocked);
+ }
+
private static final int MINIMUM_REQUIRED_HEADER_BYTES = 8;
public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
@@ -247,6 +247,12 @@
}
+ @Override
+ public void encryptedTransport()
+ {
+
+ }
+
public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
{
@@ -274,12 +280,23 @@
{
return new Subject();
}
+
+ @Override
+ public boolean isTransportBlockedForWriting()
+ {
+ return false;
+ }
+
+ @Override
+ public void setTransportBlockedForWriting(final boolean blocked)
+ {
+ }
}
private class SelfDelegateProtocolEngine implements ServerProtocolEngine
{
private final ByteBuffer _header = ByteBuffer.allocate(MINIMUM_REQUIRED_HEADER_BYTES);
- private long _lastReadTime;
+ private long _lastReadTime = System.currentTimeMillis();
public SocketAddress getRemoteAddress()
{
@@ -360,15 +377,6 @@
}
}
-
- if(newDelegate == null && looksLikeSSL(headerBytes))
- {
- if(_sslContext != null)
- {
- newDelegate = new SslDelegateProtocolEngine();
- }
- }
-
// If no delegate is found then send back a supported protocol version id
if(newDelegate == null)
{
@@ -423,6 +431,17 @@
return _delegate.getSubject();
}
+ @Override
+ public boolean isTransportBlockedForWriting()
+ {
+ return false;
+ }
+
+ @Override
+ public void setTransportBlockedForWriting(final boolean blocked)
+ {
+ }
+
public void exception(Throwable t)
{
_logger.error("Error establishing session", t);
@@ -466,6 +485,15 @@
_network.close();
}
+ @Override
+ public void encryptedTransport()
+ {
+ if(_transport == Transport.TCP)
+ {
+ _transport = Transport.SSL;
+ }
+ }
+
public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
{
@@ -484,250 +512,5 @@
}
}
- private class SslDelegateProtocolEngine implements ServerProtocolEngine
- {
- private final MultiVersionProtocolEngine _decryptEngine;
- private final SSLEngine _engine;
- private final SSLReceiver _sslReceiver;
- private final SSLBufferingSender _sslSender;
- private long _lastReadTime;
- private SslDelegateProtocolEngine()
- {
-
- _decryptEngine = new MultiVersionProtocolEngine(_broker, null, false, false, _supported,
- _defaultSupportedReply, _port, Transport.SSL, _id, _creators,
- null);
-
- _engine = _sslContext.createSSLEngine();
- _engine.setUseClientMode(false);
- SSLUtil.removeSSLv3Support(_engine);
-
- if(_needClientAuth)
- {
- _engine.setNeedClientAuth(true);
- }
- else if(_wantClientAuth)
- {
- _engine.setWantClientAuth(true);
- }
-
- SSLStatus sslStatus = new SSLStatus();
- _sslReceiver = new SSLReceiver(_engine,_decryptEngine,sslStatus);
- _sslSender = new SSLBufferingSender(_engine,_sender,sslStatus);
- _decryptEngine.setNetworkConnection(new SSLNetworkConnection(_engine,_network, _sslSender), _sslSender);
- }
-
- @Override
- public void received(ByteBuffer msg)
- {
- _lastReadTime = System.currentTimeMillis();
- _sslReceiver.received(msg);
- _sslSender.send();
- _sslSender.flush();
- }
-
- @Override
- public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
- {
- //TODO - Implement
- }
-
- @Override
- public SocketAddress getRemoteAddress()
- {
- return _decryptEngine.getRemoteAddress();
- }
-
- @Override
- public SocketAddress getLocalAddress()
- {
- return _decryptEngine.getLocalAddress();
- }
-
- @Override
- public long getWrittenBytes()
- {
- return _decryptEngine.getWrittenBytes();
- }
-
- @Override
- public long getReadBytes()
- {
- return _decryptEngine.getReadBytes();
- }
-
- @Override
- public void closed()
- {
- _decryptEngine.closed();
- }
-
- @Override
- public void writerIdle()
- {
- _decryptEngine.writerIdle();
- }
-
- @Override
- public void readerIdle()
- {
- _decryptEngine.readerIdle();
- }
-
- @Override
- public void exception(Throwable t)
- {
- _decryptEngine.exception(t);
- }
-
- @Override
- public long getConnectionId()
- {
- return _decryptEngine.getConnectionId();
- }
-
- @Override
- public Subject getSubject()
- {
- return _decryptEngine.getSubject();
- }
-
- @Override
- public long getLastReadTime()
- {
- return _lastReadTime;
- }
-
- @Override
- public long getLastWriteTime()
- {
- return _decryptEngine.getLastWriteTime();
- }
- }
-
- private boolean looksLikeSSL(byte[] headerBytes)
- {
- return looksLikeSSLv3ClientHello(headerBytes) || looksLikeSSLv2ClientHello(headerBytes);
- }
-
- private boolean looksLikeSSLv3ClientHello(byte[] headerBytes)
- {
- return headerBytes[0] == 22 && // SSL Handshake
- (headerBytes[1] == 3 && // SSL 3.0 / TLS 1.x
- (headerBytes[2] == 0 || // SSL 3.0
- headerBytes[2] == 1 || // TLS 1.0
- headerBytes[2] == 2 || // TLS 1.1
- headerBytes[2] == 3)) && // TLS1.2
- (headerBytes[5] == 1); // client_hello
- }
-
- private boolean looksLikeSSLv2ClientHello(byte[] headerBytes)
- {
- return headerBytes[0] == -128 &&
- headerBytes[3] == 3 && // SSL 3.0 / TLS 1.x
- (headerBytes[4] == 0 || // SSL 3.0
- headerBytes[4] == 1 || // TLS 1.0
- headerBytes[4] == 2 || // TLS 1.1
- headerBytes[4] == 3);
- }
-
-
- private static class SSLNetworkConnection implements NetworkConnection
- {
- private final NetworkConnection _network;
- private final SSLBufferingSender _sslSender;
- private final SSLEngine _engine;
- private Principal _principal;
- private boolean _principalChecked;
- private final Object _lock = new Object();
-
- public SSLNetworkConnection(SSLEngine engine, NetworkConnection network,
- SSLBufferingSender sslSender)
- {
- _engine = engine;
- _network = network;
- _sslSender = sslSender;
-
- }
-
- @Override
- public Sender<ByteBuffer> getSender()
- {
- return _sslSender;
- }
-
- @Override
- public void start()
- {
- _network.start();
- }
-
- @Override
- public void close()
- {
- _sslSender.close();
-
- _network.close();
- }
-
- @Override
- public SocketAddress getRemoteAddress()
- {
- return _network.getRemoteAddress();
- }
-
- @Override
- public SocketAddress getLocalAddress()
- {
- return _network.getLocalAddress();
- }
-
- @Override
- public void setMaxWriteIdle(int sec)
- {
- _network.setMaxWriteIdle(sec);
- }
-
- @Override
- public void setMaxReadIdle(int sec)
- {
- _network.setMaxReadIdle(sec);
- }
-
- @Override
- public Principal getPeerPrincipal()
- {
- synchronized (_lock)
- {
- if(!_principalChecked)
- {
- try
- {
- _principal = _engine.getSession().getPeerPrincipal();
- }
- catch (SSLPeerUnverifiedException e)
- {
- _principal = null;
- }
-
- _principalChecked = true;
- }
-
- return _principal;
- }
- }
-
- @Override
- public int getMaxReadIdle()
- {
- return _network.getMaxReadIdle();
- }
-
- @Override
- public int getMaxWriteIdle()
- {
- return _network.getMaxWriteIdle();
- }
- }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
index 5c704c5..a51717e 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
@@ -27,10 +27,7 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
-import javax.net.ssl.SSLContext;
-
import org.apache.qpid.protocol.ProtocolEngineFactory;
-import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.logging.messages.PortMessages;
import org.apache.qpid.server.logging.subjects.PortLogSubject;
import org.apache.qpid.server.model.Broker;
@@ -48,9 +45,6 @@
private final Broker<?> _broker;
private final Set<Protocol> _supported;
private final Protocol _defaultSupportedReply;
- private final SSLContext _sslContext;
- private final boolean _wantClientAuth;
- private final boolean _needClientAuth;
private final AmqpPort<?> _port;
private final Transport _transport;
private final ProtocolEngineCreator[] _creators;
@@ -58,9 +52,6 @@
_connectionCountDecrementingTask = new ConnectionCountDecrementingTask();
public MultiVersionProtocolEngineFactory(Broker<?> broker,
- SSLContext sslContext,
- boolean wantClientAuth,
- boolean needClientAuth,
final Set<Protocol> supportedVersions,
final Protocol defaultSupportedReply,
AmqpPort<?> port,
@@ -73,7 +64,6 @@
}
_broker = broker;
- _sslContext = sslContext;
_supported = supportedVersions;
_defaultSupportedReply = defaultSupportedReply;
final List<ProtocolEngineCreator> creators = new ArrayList<ProtocolEngineCreator>();
@@ -83,18 +73,16 @@
}
Collections.sort(creators, new ProtocolEngineCreatorComparator());
_creators = creators.toArray(new ProtocolEngineCreator[creators.size()]);
- _wantClientAuth = wantClientAuth;
- _needClientAuth = needClientAuth;
_port = port;
_transport = transport;
}
- public ServerProtocolEngine newProtocolEngine(final SocketAddress remoteSocketAddress)
+ public MultiVersionProtocolEngine newProtocolEngine(final SocketAddress remoteSocketAddress)
{
if(_port.canAcceptNewConnection(remoteSocketAddress))
{
_port.incrementConnectionCount();
- return new MultiVersionProtocolEngine(_broker, _sslContext, _wantClientAuth, _needClientAuth,
+ return new MultiVersionProtocolEngine(_broker,
_supported, _defaultSupportedReply, _port, _transport,
ID_GENERATOR.getAndIncrement(),
_creators, _connectionCountDecrementingTask);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index f905558..26cbf37 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -50,6 +50,7 @@
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.exchange.ExchangeImpl;
+import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.LogMessage;
@@ -2522,7 +2523,7 @@
{
if (_virtualHost.getState() != State.ACTIVE)
{
- throw new ConnectionScopedRuntimeException("Virtualhost state " + _virtualHost.getState() + " prevents the message from being sent");
+ throw new VirtualHostUnavailableException(this._virtualHost);
}
if(!message.isReferenced(this))
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
index b1f6b84..2fd10e4 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
@@ -23,18 +23,20 @@
import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
import java.net.InetSocketAddress;
+import java.util.EnumSet;
import java.util.Set;
import javax.net.ssl.SSLContext;
import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
import org.apache.qpid.transport.NetworkTransportConfiguration;
import org.apache.qpid.transport.network.IncomingNetworkTransport;
+import org.apache.qpid.transport.network.TransportEncryption;
+import org.apache.qpid.transport.network.io.NonBlockingNetworkTransport;
class TCPandSSLTransport implements AcceptingTransport
{
@@ -78,17 +80,25 @@
}
final NetworkTransportConfiguration settings = new ServerNetworkTransportConfiguration();
- _networkTransport = org.apache.qpid.transport.network.Transport.getIncomingTransportInstance();
+ _networkTransport = new NonBlockingNetworkTransport();
final MultiVersionProtocolEngineFactory protocolEngineFactory =
new MultiVersionProtocolEngineFactory(
- _port.getParent(Broker.class), _transports.contains(Transport.TCP) ? _sslContext : null,
- settings.wantClientAuth(), settings.needClientAuth(),
+ _port.getParent(Broker.class),
_supported,
_defaultSupportedProtocolReply,
_port,
_transports.contains(Transport.TCP) ? Transport.TCP : Transport.SSL);
- _networkTransport.accept(settings, protocolEngineFactory, _transports.contains(Transport.TCP) ? null : _sslContext);
+ EnumSet<TransportEncryption> encryptionSet = EnumSet.noneOf(TransportEncryption.class);
+ if(_transports.contains(Transport.TCP))
+ {
+ encryptionSet.add(TransportEncryption.NONE);
+ }
+ if(_transports.contains(Transport.SSL))
+ {
+ encryptionSet.add(TransportEncryption.TLS);
+ }
+ _networkTransport.accept(settings, protocolEngineFactory, _sslContext, encryptionSet);
}
public int getAcceptingPort()
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostUnavailableException.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostUnavailableException.java
new file mode 100644
index 0000000..a0bab0b
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostUnavailableException.java
@@ -0,0 +1,33 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+
+public class VirtualHostUnavailableException extends ConnectionScopedRuntimeException
+{
+ public VirtualHostUnavailableException(VirtualHostImpl<?, ?, ?> host)
+ {
+ super("Virtualhost state "
+ + host.getState()
+ + " prevents the message from being sent");
+ }
+}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
index 1c42d9b..47ed224 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
@@ -456,6 +456,12 @@
{
return 0;
}
+
+ @Override
+ public void transportStateChanged()
+ {
+
+ }
}
private static class MockConnectionModel implements AMQConnectionModel
@@ -663,5 +669,7 @@
{
}
+
+
}
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index 89d6811..afa4fb8 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -158,6 +158,10 @@
return _name;
}
+ public void transportStateChanged()
+ {
+ _creditManager.restoreCredit(0, 0);
+ }
public static class AddMessageDispositionListenerAction implements Runnable
{
@@ -555,10 +559,10 @@
switch(flowMode)
{
case CREDIT:
- _creditManager = new CreditCreditManager(0l,0l);
+ _creditManager = new CreditCreditManager(0l, 0l, _session.getConnection().getProtocolEngine());
break;
case WINDOW:
- _creditManager = new WindowCreditManager(0l,0l);
+ _creditManager = new WindowCreditManager(0l, 0l, _session.getConnection().getProtocolEngine());
break;
default:
// this should never happen, as 0-10 is finalised and so the enum should never change
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java
index 8dddac9..e670c1f 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java
@@ -21,48 +21,27 @@
package org.apache.qpid.server.protocol.v0_10;
+import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.flow.AbstractFlowCreditManager;
public class CreditCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10
{
+ private final ServerProtocolEngine _serverProtocolEngine;
private volatile long _bytesCredit;
private volatile long _messageCredit;
- public CreditCreditManager(long bytesCredit, long messageCredit)
+ public CreditCreditManager(long bytesCredit, long messageCredit, final ServerProtocolEngine serverProtocolEngine)
{
+ _serverProtocolEngine = serverProtocolEngine;
_bytesCredit = bytesCredit;
_messageCredit = messageCredit;
setSuspended(!hasCredit());
}
-
- public synchronized void setCreditLimits(final long bytesCredit, final long messageCredit)
- {
- _bytesCredit = bytesCredit;
- _messageCredit = messageCredit;
-
- setSuspended(!hasCredit());
-
- }
-
-
- public long getMessageCredit()
- {
- return _messageCredit == -1L
- ? Long.MAX_VALUE
- : _messageCredit;
- }
-
- public long getBytesCredit()
- {
- return _bytesCredit == -1L
- ? Long.MAX_VALUE
- : _bytesCredit;
- }
-
public synchronized void restoreCredit(final long messageCredit, final long bytesCredit)
{
+ setSuspended(!hasCredit());
}
@@ -107,12 +86,17 @@
public synchronized boolean hasCredit()
{
// Note !=, if credit is < 0 that indicates infinite credit
- return (_bytesCredit != 0L && _messageCredit != 0L);
+ return (_bytesCredit != 0L && _messageCredit != 0L && !_serverProtocolEngine.isTransportBlockedForWriting());
}
public synchronized boolean useCreditForMessage(long msgSize)
{
- if(_messageCredit >= 0L)
+ if (_serverProtocolEngine.isTransportBlockedForWriting())
+ {
+ setSuspended(true);
+ return false;
+ }
+ else if(_messageCredit >= 0L)
{
if(_messageCredit > 0)
{
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
index 30aecdb..5c91925 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
@@ -86,7 +86,10 @@
conn.setRemoteAddress(network.getRemoteAddress());
conn.setLocalAddress(network.getLocalAddress());
- return new ProtocolEngine_0_10( conn, network);
+ ProtocolEngine_0_10 protocolEngine = new ProtocolEngine_0_10(conn, network);
+ conn.setProtocolEngine(protocolEngine);
+
+ return protocolEngine;
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
index 854cd38..58b6a3a 100755
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
@@ -52,8 +52,9 @@
private ServerConnection _connection;
private long _createTime = System.currentTimeMillis();
- private long _lastReadTime;
- private long _lastWriteTime;
+ private long _lastReadTime = _createTime;
+ private long _lastWriteTime = _createTime;
+ private volatile boolean _transportBlockedForWriting;
public ProtocolEngine_0_10(ServerConnection conn,
NetworkConnection network)
@@ -101,17 +102,13 @@
return new Sender<ByteBuffer>()
{
@Override
- public void setIdleTimeout(int i)
- {
- sender.setIdleTimeout(i);
-
- }
-
- @Override
public void send(ByteBuffer msg)
{
_lastWriteTime = System.currentTimeMillis();
- sender.send(msg);
+ ByteBuffer copy = ByteBuffer.wrap(new byte[msg.remaining()]);
+ copy.put(msg);
+ copy.flip();
+ sender.send(copy);
}
@@ -190,6 +187,11 @@
return _writtenBytes;
}
+ @Override
+ public void encryptedTransport()
+ {
+ }
+
public void writerIdle()
{
_connection.doHeartBeat();
@@ -215,11 +217,6 @@
return getRemoteAddress().toString();
}
- public String getAuthId()
- {
- return _connection.getAuthorizedPrincipal() == null ? null : _connection.getAuthorizedPrincipal().getName();
- }
-
public boolean isDurable()
{
return false;
@@ -246,4 +243,18 @@
{
return _connection.getAuthorizedSubject();
}
+
+ @Override
+ public boolean isTransportBlockedForWriting()
+ {
+ return _transportBlockedForWriting;
+ }
+
+ @Override
+ public void setTransportBlockedForWriting(final boolean blocked)
+ {
+ _transportBlockedForWriting = blocked;
+ _connection.transportStateChanged();
+ }
+
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
index 8567be3..cbd569d 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
@@ -37,6 +37,7 @@
import javax.security.auth.Subject;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.connection.ConnectionPrincipal;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.LogSubject;
@@ -90,6 +91,8 @@
private int _messageCompressionThreshold;
private int _maxMessageSize;
+ private ServerProtocolEngine _serverProtocolEngine;
+
public ServerConnection(final long connectionId,
Broker<?> broker,
final AmqpPort<?> port,
@@ -189,6 +192,16 @@
super.setConnectionDelegate(delegate);
}
+ public ServerProtocolEngine getProtocolEngine()
+ {
+ return _serverProtocolEngine;
+ }
+
+ public void setProtocolEngine(final ServerProtocolEngine serverProtocolEngine)
+ {
+ _serverProtocolEngine = serverProtocolEngine;
+ }
+
public VirtualHostImpl<?,?,?> getVirtualHost()
{
return _virtualHost;
@@ -664,4 +677,12 @@
{
return _maxMessageSize;
}
+
+ public void transportStateChanged()
+ {
+ for (AMQSessionModel ssn : getSessionModels())
+ {
+ ssn.transportStateChanged();
+ }
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index 223de4f..1d8676e 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -56,6 +56,7 @@
import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
import org.apache.qpid.server.connection.SessionPrincipal;
import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.ChannelMessages;
@@ -874,6 +875,15 @@
}
@Override
+ public void transportStateChanged()
+ {
+ for(ConsumerTarget_0_10 consumerTarget : getSubscriptions())
+ {
+ consumerTarget.transportStateChanged();
+ }
+ }
+
+ @Override
public Object getConnectionReference()
{
return getConnection().getReference();
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index 288a4f9..8fdee7a 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -35,8 +35,10 @@
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.exchange.ExchangeImpl;
+import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
import org.apache.qpid.server.filter.AMQInvalidArgumentException;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
@@ -243,8 +245,8 @@
}
else
{
-
- FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L);
+ ServerProtocolEngine serverProtocolEngine = getServerConnection(session).getProtocolEngine();
+ FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L, serverProtocolEngine);
FilterManager filterManager = null;
try
@@ -381,58 +383,69 @@
new MessageTransferMessage(storeMessage, serverSession.getReference());
MessageReference<MessageTransferMessage> reference = message.newReference();
- final InstanceProperties instanceProperties = new InstanceProperties()
+ try
{
- @Override
- public Object getProperty(final Property prop)
+ final InstanceProperties instanceProperties = new InstanceProperties()
{
- switch (prop)
+ @Override
+ public Object getProperty(final Property prop)
{
- case EXPIRATION:
- return message.getExpiration();
- case IMMEDIATE:
- return message.isImmediate();
- case MANDATORY:
- return (delvProps == null || !delvProps.getDiscardUnroutable())
- && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT;
- case PERSISTENT:
- return message.isPersistent();
- case REDELIVERED:
- return delvProps.getRedelivered();
+ switch (prop)
+ {
+ case EXPIRATION:
+ return message.getExpiration();
+ case IMMEDIATE:
+ return message.isImmediate();
+ case MANDATORY:
+ return (delvProps == null || !delvProps.getDiscardUnroutable())
+ && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT;
+ case PERSISTENT:
+ return message.isPersistent();
+ case REDELIVERED:
+ return delvProps.getRedelivered();
+ }
+ return null;
}
- return null;
- }
- };
+ };
- int enqueues = serverSession.enqueue(message, instanceProperties, destination);
+ int enqueues = serverSession.enqueue(message, instanceProperties, destination);
- if (enqueues == 0)
- {
- if ((delvProps == null || !delvProps.getDiscardUnroutable())
- && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT)
+ if (enqueues == 0)
{
- RangeSet rejects = RangeSetFactory.createRangeSet();
- rejects.add(xfr.getId());
- MessageReject reject = new MessageReject(rejects, MessageRejectCode.UNROUTABLE, "Unroutable");
- ssn.invoke(reject);
+ if ((delvProps == null || !delvProps.getDiscardUnroutable())
+ && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT)
+ {
+ RangeSet rejects = RangeSetFactory.createRangeSet();
+ rejects.add(xfr.getId());
+ MessageReject reject = new MessageReject(rejects, MessageRejectCode.UNROUTABLE, "Unroutable");
+ ssn.invoke(reject);
+ }
+ else
+ {
+ virtualHost.getEventLogger().message(ExchangeMessages.DISCARDMSG(destination.getName(),
+ messageMetaData.getRoutingKey()));
+ }
+ }
+
+ if (serverSession.isTransactional())
+ {
+ serverSession.processed(xfr);
}
else
{
- virtualHost.getEventLogger().message(ExchangeMessages.DISCARDMSG(destination.getName(),
- messageMetaData.getRoutingKey()));
+ serverSession.recordFuture(StoreFuture.IMMEDIATE_FUTURE,
+ new CommandProcessedAction(serverSession, xfr));
}
}
+ catch (VirtualHostUnavailableException e)
+ {
+ getServerConnection(serverSession).close(AMQConstant.CONNECTION_FORCED, e.getMessage());
+ }
+ finally
+ {
+ reference.release();
+ }
- if (serverSession.isTransactional())
- {
- serverSession.processed(xfr);
- }
- else
- {
- serverSession.recordFuture(StoreFuture.IMMEDIATE_FUTURE,
- new CommandProcessedAction(serverSession, xfr));
- }
- reference.release();
}
}
@@ -549,7 +562,7 @@
{
try
{
- ((ServerSession)session).endDtx(method.getXid(), method.getFail(), method.getSuspend());
+ ((ServerSession) session).endDtx(method.getXid(), method.getFail(), method.getSuspend());
}
catch (TimeoutDtxException e)
{
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java
index 8e48741..e11d2ce 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java
@@ -21,11 +21,14 @@
package org.apache.qpid.server.protocol.v0_10;
import org.apache.log4j.Logger;
+
+import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.flow.AbstractFlowCreditManager;
public class WindowCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10
{
private static final Logger LOGGER = Logger.getLogger(WindowCreditManager.class);
+ private final ServerProtocolEngine _serverProtocolEngine;
private volatile long _bytesCreditLimit;
private volatile long _messageCreditLimit;
@@ -33,39 +36,22 @@
private volatile long _bytesUsed;
private volatile long _messageUsed;
- public WindowCreditManager()
- {
- this(0L, 0L);
- }
-
- public WindowCreditManager(long bytesCreditLimit, long messageCreditLimit)
+ public WindowCreditManager(long bytesCreditLimit,
+ long messageCreditLimit,
+ ServerProtocolEngine serverProtocolEngine)
{
+ _serverProtocolEngine = serverProtocolEngine;
_bytesCreditLimit = bytesCreditLimit;
_messageCreditLimit = messageCreditLimit;
setSuspended(!hasCredit());
}
- public long getBytesCreditLimit()
- {
- return _bytesCreditLimit;
- }
-
public long getMessageCreditLimit()
{
return _messageCreditLimit;
}
- public synchronized void setCreditLimits(final long bytesCreditLimit, final long messageCreditLimit)
- {
- _bytesCreditLimit = bytesCreditLimit;
- _messageCreditLimit = messageCreditLimit;
-
- setSuspended(!hasCredit());
-
- }
-
-
public long getMessageCredit()
{
return _messageCreditLimit == -1L
@@ -121,12 +107,18 @@
public synchronized boolean hasCredit()
{
return (_bytesCreditLimit < 0L || _bytesCreditLimit > _bytesUsed)
- && (_messageCreditLimit < 0L || _messageCreditLimit > _messageUsed);
+ && (_messageCreditLimit < 0L || _messageCreditLimit > _messageUsed)
+ && !_serverProtocolEngine.isTransportBlockedForWriting();
}
public synchronized boolean useCreditForMessage(final long msgSize)
{
- if(_messageCreditLimit >= 0L)
+ if (_serverProtocolEngine.isTransportBlockedForWriting())
+ {
+ setSuspended(true);
+ return false;
+ }
+ else if(_messageCreditLimit >= 0L)
{
if(_messageUsed < _messageCreditLimit)
{
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java
index 1c4a694..b05edc5 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java
@@ -20,17 +20,25 @@
*/
package org.apache.qpid.server.protocol.v0_10;
-import org.apache.qpid.server.protocol.v0_10.WindowCreditManager;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.test.utils.QpidTestCase;
public class WindowCreditManagerTest extends QpidTestCase
{
private WindowCreditManager _creditManager;
+ private ServerProtocolEngine _protocolEngine;
protected void setUp() throws Exception
{
super.setUp();
- _creditManager = new WindowCreditManager();
+
+ _protocolEngine = mock(ServerProtocolEngine.class);
+ when(_protocolEngine.isTransportBlockedForWriting()).thenReturn(false);
+
+ _creditManager = new WindowCreditManager(0l, 0l, _protocolEngine);
}
/**
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index d52fb73..f7f65e2 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -66,8 +66,6 @@
import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.filter.SimpleFilterManager;
import org.apache.qpid.server.flow.FlowCreditManager;
-import org.apache.qpid.server.flow.MessageOnlyCreditManager;
-import org.apache.qpid.server.flow.Pre0_10CreditManager;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.ChannelMessages;
@@ -133,7 +131,8 @@
private final int _channelId;
- private final Pre0_10CreditManager _creditManager = new Pre0_10CreditManager(0l,0l);
+ private final Pre0_10CreditManager _creditManager;
+ private final FlowCreditManager _noAckCreditManager;
/**
* The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that
@@ -213,6 +212,9 @@
public AMQChannel(AMQProtocolEngine connection, int channelId, final MessageStore messageStore)
{
+ _creditManager = new Pre0_10CreditManager(0l,0l, connection);
+ _noAckCreditManager = new NoAckCreditManager(connection);
+
_connection = connection;
_channelId = channelId;
@@ -699,7 +701,7 @@
if(filters != null && Boolean.TRUE.equals(filters.get(AMQPFilterTypes.NO_CONSUME.getValue())))
{
- target = ConsumerTarget_0_8.createBrowserTarget(this, tag, filters, _creditManager);
+ target = ConsumerTarget_0_8.createBrowserTarget(this, tag, filters, _noAckCreditManager);
}
else if(acks)
{
@@ -709,7 +711,7 @@
}
else
{
- target = ConsumerTarget_0_8.createNoAckTarget(this, tag, filters, _creditManager);
+ target = ConsumerTarget_0_8.createNoAckTarget(this, tag, filters, _noAckCreditManager);
options.add(ConsumerImpl.Option.ACQUIRES);
options.add(ConsumerImpl.Option.SEES_REQUEUES);
}
@@ -1633,6 +1635,7 @@
}
}
+
public synchronized void block(AMQQueue queue)
{
if(_blockingEntities.add(queue))
@@ -1661,6 +1664,13 @@
}
@Override
+ public void transportStateChanged()
+ {
+ _creditManager.restoreCredit(0, 0);
+ _noAckCreditManager.restoreCredit(0, 0);
+ }
+
+ @Override
public Object getConnectionReference()
{
return getConnection().getReference();
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index 4212505..cea9b09 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -170,7 +170,7 @@
private Sender<ByteBuffer> _sender;
private volatile boolean _deferFlush;
- private long _lastReceivedTime;
+ private long _lastReceivedTime = System.currentTimeMillis(); // TODO consider if this is what we want?
private boolean _blocking;
private final ReentrantLock _receivedLock;
@@ -188,6 +188,7 @@
private int _currentMethodId;
private int _binaryDataLimit;
private long _maxMessageSize;
+ private volatile boolean _transportBlockedForWriting;
public AMQProtocolEngine(Broker<?> broker,
final NetworkConnection network,
@@ -250,6 +251,22 @@
return _authorizedSubject;
}
+ @Override
+ public boolean isTransportBlockedForWriting()
+ {
+ return _transportBlockedForWriting;
+ }
+
+ @Override
+ public void setTransportBlockedForWriting(final boolean blocked)
+ {
+ _transportBlockedForWriting = blocked;
+ for(AMQChannel channel : _channelMap.values())
+ {
+ channel.transportStateChanged();
+ }
+ }
+
public void setNetworkConnection(NetworkConnection network)
{
setNetworkConnection(network, network.getSender());
@@ -514,20 +531,10 @@
throw new ServerScopedRuntimeException(e);
}
- final ByteBuffer buf;
-
- if(size <= REUSABLE_BYTE_BUFFER_CAPACITY)
- {
- buf = _reusableByteBuffer;
- buf.position(0);
- }
- else
- {
- buf = ByteBuffer.wrap(data);
- }
- buf.limit(_reusableDataOutput.length());
-
- return buf;
+ final ByteBuffer copy = ByteBuffer.allocate(_reusableDataOutput.length());
+ copy.put(data, 0, _reusableDataOutput.length());
+ copy.flip();
+ return copy;
}
@@ -1160,6 +1167,11 @@
}
}
+ @Override
+ public void encryptedTransport()
+ {
+ }
+
public void readerIdle()
{
Subject.doAs(_authorizedSubject, new PrivilegedAction<Object>()
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
index 43982db..d6642ae 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
@@ -136,12 +136,6 @@
}
- @Override
- public boolean allocateCredit(ServerMessage msg)
- {
- return true;
- }
-
}
public static ConsumerTarget_0_8 createNoAckTarget(AMQChannel channel,
@@ -215,12 +209,6 @@
}
- @Override
- public boolean allocateCredit(ServerMessage msg)
- {
- return true;
- }
-
private static final ServerTransaction.Action NOOP =
new ServerTransaction.Action()
{
@@ -250,11 +238,6 @@
super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
}
- public boolean allocateCredit(ServerMessage msg)
- {
- return getCreditManager().useCreditForMessage(msg.getSize());
- }
-
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageOnlyCreditManager.java
similarity index 86%
rename from qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java
rename to qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageOnlyCreditManager.java
index 1817e8a..af54c91 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageOnlyCreditManager.java
@@ -18,10 +18,13 @@
* under the License.
*
*/
-package org.apache.qpid.server.flow;
+package org.apache.qpid.server.protocol.v0_8;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.qpid.server.flow.AbstractFlowCreditManager;
+import org.apache.qpid.server.flow.FlowCreditManager;
+
public class MessageOnlyCreditManager extends AbstractFlowCreditManager implements FlowCreditManager
{
private final AtomicLong _messageCredit;
@@ -31,16 +34,6 @@
_messageCredit = new AtomicLong(initialCredit);
}
- public long getMessageCredit()
- {
- return _messageCredit.get();
- }
-
- public long getBytesCredit()
- {
- return -1L;
- }
-
public void restoreCredit(long messageCredit, long bytesCredit)
{
_messageCredit.addAndGet(messageCredit);
@@ -48,12 +41,6 @@
}
- public void removeAllCredit()
- {
- setSuspended(true);
- _messageCredit.set(0L);
- }
-
public boolean hasCredit()
{
return _messageCredit.get() > 0L;
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java
new file mode 100644
index 0000000..2d32617
--- /dev/null
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.protocol.v0_8;
+
+import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.flow.AbstractFlowCreditManager;
+
+public class NoAckCreditManager extends AbstractFlowCreditManager
+{
+ private final ServerProtocolEngine _serverProtocolEngine;
+
+ public NoAckCreditManager(ServerProtocolEngine serverProtocolEngine)
+ {
+ _serverProtocolEngine = serverProtocolEngine;
+ }
+
+ @Override
+ public void restoreCredit(final long messageCredit, final long bytesCredit)
+ {
+ setSuspended(!hasCredit());
+ }
+
+ @Override
+ public boolean hasCredit()
+ {
+ return !_serverProtocolEngine.isTransportBlockedForWriting();
+ }
+
+ @Override
+ public boolean useCreditForMessage(final long msgSize)
+ {
+ if (!hasCredit())
+ {
+ setSuspended(true);
+ return false;
+ }
+ return true;
+ }
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java
similarity index 85%
rename from qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java
rename to qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java
index fc2d4bf..e63645e 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java
@@ -18,20 +18,28 @@
* under the License.
*
*/
-package org.apache.qpid.server.flow;
+package org.apache.qpid.server.protocol.v0_8;
+import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.flow.AbstractFlowCreditManager;
+import org.apache.qpid.server.flow.FlowCreditManager;
+
public class Pre0_10CreditManager extends AbstractFlowCreditManager implements FlowCreditManager
{
+ private final ServerProtocolEngine _protocolEngine;
private volatile long _bytesCreditLimit;
private volatile long _messageCreditLimit;
private volatile long _bytesCredit;
private volatile long _messageCredit;
- public Pre0_10CreditManager(long bytesCreditLimit, long messageCreditLimit)
+ public Pre0_10CreditManager(long bytesCreditLimit,
+ long messageCreditLimit,
+ ServerProtocolEngine protocolEngine)
{
+ _protocolEngine = protocolEngine;
_bytesCreditLimit = bytesCreditLimit;
_messageCreditLimit = messageCreditLimit;
_bytesCredit = bytesCreditLimit;
@@ -39,6 +47,7 @@
}
+
public synchronized void setCreditLimits(final long bytesCreditLimit, final long messageCreditLimit)
{
long bytesCreditChange = bytesCreditLimit - _bytesCreditLimit;
@@ -80,16 +89,6 @@
}
- public long getMessageCredit()
- {
- return _messageCredit;
- }
-
- public long getBytesCredit()
- {
- return _bytesCredit;
- }
-
public synchronized void restoreCredit(final long messageCredit, final long bytesCredit)
{
final long messageCreditLimit = _messageCreditLimit;
@@ -119,22 +118,21 @@
}
- public synchronized void removeAllCredit()
- {
- _bytesCredit = 0L;
- _messageCredit = 0L;
- setSuspended(!hasCredit());
- }
-
public synchronized boolean hasCredit()
{
return (_bytesCreditLimit == 0L || _bytesCredit > 0)
- && (_messageCreditLimit == 0L || _messageCredit > 0);
+ && (_messageCreditLimit == 0L || _messageCredit > 0)
+ && !_protocolEngine.isTransportBlockedForWriting();
}
public synchronized boolean useCreditForMessage(final long msgSize)
{
- if(_messageCreditLimit != 0L)
+ if (_protocolEngine.isTransportBlockedForWriting())
+ {
+ setSuspended(true);
+ return false;
+ }
+ else if(_messageCreditLimit != 0L)
{
if(_messageCredit != 0L)
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
index 9326f16..55fc865 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
@@ -31,8 +31,6 @@
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.MessagePublishInfo;
import org.apache.qpid.server.consumer.ConsumerImpl;
-import org.apache.qpid.server.flow.LimitlessCreditManager;
-import org.apache.qpid.server.flow.Pre0_10CreditManager;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.StoredMessage;
@@ -328,7 +326,7 @@
public void testMessageDequeueRestoresCreditTest() throws Exception
{
// Send 10 messages
- Pre0_10CreditManager creditManager = new Pre0_10CreditManager(0l, 1);
+ Pre0_10CreditManager creditManager = new Pre0_10CreditManager(0l, 1, _protocolEngine);
_subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel, DEFAULT_CONSUMER_TAG, null, creditManager);
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
index 7407890..622cf32 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
@@ -288,10 +288,6 @@
{
_sender = new Sender<ByteBuffer>()
{
- public void setIdleTimeout(int i)
- {
- }
-
public void send(ByteBuffer msg)
{
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/LimitlessCreditManager.java
similarity index 87%
rename from qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java
rename to qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/LimitlessCreditManager.java
index 89fc606..c4c89ac 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/LimitlessCreditManager.java
@@ -18,20 +18,14 @@
* under the License.
*
*/
-package org.apache.qpid.server.flow;
+package org.apache.qpid.server.protocol.v0_8;
+import org.apache.qpid.server.flow.AbstractFlowCreditManager;
+import org.apache.qpid.server.flow.FlowCreditManager;
+
public class LimitlessCreditManager extends AbstractFlowCreditManager implements FlowCreditManager
{
- public long getMessageCredit()
- {
- return -1L;
- }
-
- public long getBytesCredit()
- {
- return -1L;
- }
public void restoreCredit(long messageCredit, long bytesCredit)
{
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
index 8e24d55..b55bd03 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
@@ -44,6 +44,7 @@
import org.apache.qpid.amqp_1_0.type.transport.End;
import org.apache.qpid.amqp_1_0.type.transport.Error;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.connection.ConnectionPrincipal;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.model.Broker;
@@ -64,6 +65,7 @@
private final AmqpPort<?> _port;
private final Broker<?> _broker;
private final SubjectCreator _subjectCreator;
+ private final ServerProtocolEngine _protocolEngine;
private VirtualHostImpl _vhost;
private final Transport _transport;
private final ConnectionEndpoint _conn;
@@ -101,12 +103,16 @@
private boolean _closedOnOpen;
+
public Connection_1_0(Broker<?> broker,
ConnectionEndpoint conn,
long connectionId,
AmqpPort<?> port,
- Transport transport, final SubjectCreator subjectCreator)
+ Transport transport,
+ final SubjectCreator subjectCreator,
+ final ServerProtocolEngine protocolEngine)
{
+ _protocolEngine = protocolEngine;
_broker = broker;
_port = port;
_transport = transport;
@@ -363,6 +369,11 @@
return _port;
}
+ public ServerProtocolEngine getProtocolEngine()
+ {
+ return _protocolEngine;
+ }
+
@Override
public Transport getTransport()
{
@@ -480,4 +491,11 @@
}
+ public void transportStateChanged()
+ {
+ for (Session_1_0 session : _sessions)
+ {
+ session.transportStateChanged();
+ }
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index c5d9a5e..b5e1bda 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -40,6 +40,7 @@
import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.consumer.AbstractConsumerTarget;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageInstance;
@@ -84,7 +85,7 @@
public boolean isSuspended()
{
- return _link.getSession().getConnectionModel().isStopped() || getState() != State.ACTIVE;// || !getEndpoint().hasCreditToSend();
+ return _link.getSession().getConnectionModel().isStopped() || getState() != State.ACTIVE;
}
@@ -290,7 +291,9 @@
{
synchronized (_link.getLock())
{
- final boolean hasCredit = _link.isAttached() && getEndpoint().hasCreditToSend();
+
+ ServerProtocolEngine protocolEngine = getSession().getConnection().getProtocolEngine();
+ final boolean hasCredit = _link.isAttached() && getEndpoint().hasCreditToSend() && !protocolEngine.isTransportBlockedForWriting();
if(!hasCredit && getState() == State.ACTIVE)
{
suspend();
@@ -330,7 +333,8 @@
{
synchronized(_link.getLock())
{
- if(isSuspended() && getEndpoint() != null)
+ ServerProtocolEngine protocolEngine = getSession().getConnection().getProtocolEngine();
+ if(isSuspended() && getEndpoint() != null && !protocolEngine.isTransportBlockedForWriting())
{
updateState(State.SUSPENDED, State.ACTIVE);
_transactionId = _link.getTransactionId();
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
index 740b01e..b2783a2 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
@@ -118,6 +118,7 @@
private NetworkConnection _network;
private Sender<ByteBuffer> _sender;
private Connection_1_0 _connection;
+ private volatile boolean _transportBlockedForWriting;
static enum State {
@@ -179,6 +180,11 @@
//Todo
}
+ @Override
+ public void encryptedTransport()
+ {
+ }
+
public void setNetworkConnection(final NetworkConnection network, final Sender<ByteBuffer> sender)
{
_network = network;
@@ -211,7 +217,7 @@
_endpoint.setProperties(serverProperties);
_endpoint.setRemoteAddress(getRemoteAddress());
- _connection = new Connection_1_0(_broker, _endpoint, _connectionId, _port, _transport, subjectCreator);
+ _connection = new Connection_1_0(_broker, _endpoint, _connectionId, _port, _transport, subjectCreator, this);
_endpoint.setConnectionEventListener(_connection);
_endpoint.setFrameOutputHandler(this);
@@ -524,6 +530,8 @@
}
+
+
public void close()
{
_sender.close();
@@ -554,4 +562,18 @@
{
return _lastWriteTime;
}
+
+ @Override
+ public boolean isTransportBlockedForWriting()
+ {
+ return _transportBlockedForWriting;
+ }
+ @Override
+ public void setTransportBlockedForWriting(final boolean blocked)
+ {
+ _transportBlockedForWriting = blocked;
+ _connection.transportStateChanged();
+
+ }
+
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
index 2cfe431..f8e4853 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
@@ -728,4 +728,9 @@
{
return _consumer;
}
+
+ public ConsumerTarget_1_0 getConsumerTarget()
+ {
+ return _target;
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index 1820de9..01c11b9 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -109,6 +109,7 @@
private final Subject _subject = new Subject();
private final CopyOnWriteArrayList<Consumer<?>> _consumers = new CopyOnWriteArrayList<Consumer<?>>();
+ private final CopyOnWriteArrayList<SendingLink_1_0> _sendingLinks = new CopyOnWriteArrayList<>();
private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener();
private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>();
private Session<?> _modelObject;
@@ -211,7 +212,7 @@
);
sendingLinkEndpoint.setLinkEventListener(new SubjectSpecificSendingLinkListener(sendingLink));
- registerConsumer(sendingLink.getConsumer());
+ registerConsumer(sendingLink);
link = sendingLink;
if(TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()))
@@ -411,12 +412,14 @@
}
}
- private void registerConsumer(final ConsumerImpl consumer)
+ private void registerConsumer(final SendingLink_1_0 link)
{
+ ConsumerImpl consumer = link.getConsumer();
if(consumer instanceof Consumer<?>)
{
Consumer<?> modelConsumer = (Consumer<?>) consumer;
_consumers.add(modelConsumer);
+ _sendingLinks.add(link);
modelConsumer.addChangeListener(_consumerClosedListener);
consumerAdded(modelConsumer);
}
@@ -614,6 +617,20 @@
}
@Override
+ public void transportStateChanged()
+ {
+ for(SendingLink_1_0 link : _sendingLinks)
+ {
+ ConsumerTarget_1_0 target = link.getConsumerTarget();
+ target.flowStateChanged();
+
+
+ }
+
+
+ }
+
+ @Override
public LogSubject getLogSubject()
{
return this;
diff --git a/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java b/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
index a194ac7..940e24d 100644
--- a/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
+++ b/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
@@ -81,9 +81,7 @@
_supported = supported;
_defaultSupportedProtocolReply = defaultSupportedProtocolReply;
_factory = new MultiVersionProtocolEngineFactory(
- _port.getParent(Broker.class), null,
- _port.getWantClientAuth(),
- _port.getNeedClientAuth(),
+ _port.getParent(Broker.class),
_supported,
_defaultSupportedProtocolReply,
_port,
@@ -273,12 +271,6 @@
}
@Override
- public void setIdleTimeout(final int i)
- {
-
- }
-
- @Override
public void send(final ByteBuffer msg)
{
try
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index c2582ac..d5e3027 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -316,6 +316,11 @@
}
}
+ @Override
+ public void encryptedTransport()
+ {
+ }
+
public void readerIdle()
{
_logger.debug("Protocol Session [" + this + "] idle: reader");
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java b/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
index 8a7e6ab..c7dee5b 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
@@ -670,7 +670,7 @@
{
private List<ProtocolEvent> _sendEvents = new ArrayList<ProtocolEvent>();
- public void setIdleTimeout(int i)
+ private void setIdleTimeout(int i)
{
}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/transport/MockSender.java b/qpid/java/client/src/test/java/org/apache/qpid/client/transport/MockSender.java
index 7c3988c1..11b34d3 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/client/transport/MockSender.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/client/transport/MockSender.java
@@ -27,11 +27,6 @@
public class MockSender implements Sender<ByteBuffer>
{
- public void setIdleTimeout(int i)
- {
-
- }
-
public void send(ByteBuffer msg)
{
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
index 6774d0a..cad5461 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
@@ -20,14 +20,14 @@
*/
package org.apache.qpid.protocol;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.TransportActivity;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-
/**
* A ProtocolEngine is a Receiver for java.nio.ByteBuffers. It takes the data passed to it in the received
* decodes it and then process the result.
@@ -56,7 +56,8 @@
// Called when the NetworkEngine has not read data for the specified period of time (will close the connection)
void readerIdle();
+ void encryptedTransport();
public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender);
-}
\ No newline at end of file
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java
index 5c6918e..35d262c 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java
@@ -30,4 +30,8 @@
long getConnectionId();
Subject getSubject();
+
+ boolean isTransportBlockedForWriting();
+
+ void setTransportBlockedForWriting(boolean blocked);
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
index 1866e1f..c2a0911 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
@@ -153,11 +153,9 @@
maxFrameSize,
actualHeartbeatInterval);
- int idleTimeout = (int)(actualHeartbeatInterval * 1000 * heartbeatTimeoutFactor);
conn.getNetworkConnection().setMaxReadIdle((int)(actualHeartbeatInterval*heartbeatTimeoutFactor));
conn.getNetworkConnection().setMaxWriteIdle(actualHeartbeatInterval);
conn.setMaxFrameSize(maxFrameSize == 0 ? 0xffff : maxFrameSize);
- conn.setIdleTimeout(idleTimeout);
int channelMax = tune.getChannelMax();
//0 means no implied limit, except available server resources
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index e33e007..92ccdb8 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -159,7 +159,6 @@
public void setSender(Sender<ProtocolEvent> sender)
{
this.sender = sender;
- sender.setIdleTimeout(idleTimeout);
}
protected void setState(State state)
@@ -675,20 +674,6 @@
}
}
- public void setIdleTimeout(int i)
- {
- idleTimeout = i;
- if (sender != null)
- {
- sender.setIdleTimeout(i);
- }
- }
-
- public int getIdleTimeout()
- {
- return idleTimeout;
- }
-
public String getUserID()
{
return userID;
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Sender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Sender.java
index 6519702..9a6f675 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Sender.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Sender.java
@@ -28,7 +28,6 @@
public interface Sender<T>
{
- void setIdleTimeout(int i);
void send(T msg);
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
index 82a677b..f8fd286 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
@@ -126,8 +126,11 @@
protected void connectionAuthFailed(final Connection conn, Exception e)
{
- conn.exception(e);
- conn.connectionClose(ConnectionCloseCode.CONNECTION_FORCED, e.getMessage());
+ if (e != null)
+ {
+ conn.exception(e);
+ }
+ conn.connectionClose(ConnectionCloseCode.CONNECTION_FORCED, e == null ? "Authentication failed" : e.getMessage());
}
protected void connectionAuthContinue(final Connection conn, byte[] challenge)
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
index a804cb2..81a4c78 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
@@ -251,11 +251,6 @@
throw new IllegalArgumentException(String.valueOf(error));
}
- public void setIdleTimeout(int i)
- {
- sender.setIdleTimeout(i);
- }
-
@Override
public void setMaxFrameSize(final int maxFrame)
{
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java
index e0cd9ca..f378c54 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.transport.network;
+import java.util.Set;
+
import javax.net.ssl.SSLContext;
import org.apache.qpid.protocol.ProtocolEngineFactory;
@@ -29,7 +31,8 @@
{
public void accept(NetworkTransportConfiguration config,
ProtocolEngineFactory factory,
- SSLContext sslContext);
+ SSLContext sslContext,
+ final Set<TransportEncryption> encryptionSet);
public int getAcceptingPort();
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/TransportEncryption.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/TransportEncryption.java
new file mode 100644
index 0000000..b3f1f1c
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/TransportEncryption.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network;
+
+public enum TransportEncryption
+{
+ NONE, TLS
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java
new file mode 100644
index 0000000..9d0fe5d
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java
@@ -0,0 +1,343 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network.io;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
+import java.nio.ByteBuffer;
+import java.util.Set;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLServerSocket;
+import javax.net.ssl.SSLServerSocketFactory;
+
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.configuration.CommonProperties;
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.transport.ConnectionSettings;
+import org.apache.qpid.transport.NetworkTransportConfiguration;
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.network.IncomingNetworkTransport;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.OutgoingNetworkTransport;
+import org.apache.qpid.transport.network.TransportActivity;
+import org.apache.qpid.transport.network.TransportEncryption;
+import org.apache.qpid.transport.network.security.ssl.SSLUtil;
+
+// TODO we are no longer using the IncomingNetworkTransport
+public abstract class AbstractNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport
+{
+ private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(AbstractNetworkTransport.class);
+ private static final int TIMEOUT = Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME,
+ CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT);
+ private static final int HANSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME ,
+ CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT);
+ private Socket _socket;
+ private NetworkConnection _connection;
+ private AcceptingThread _acceptor;
+
+ public NetworkConnection connect(ConnectionSettings settings,
+ Receiver<ByteBuffer> delegate,
+ TransportActivity transportActivity)
+ {
+ int sendBufferSize = settings.getWriteBufferSize();
+ int receiveBufferSize = settings.getReadBufferSize();
+
+ try
+ {
+ _socket = new Socket();
+ _socket.setReuseAddress(true);
+ _socket.setTcpNoDelay(settings.isTcpNodelay());
+ _socket.setSendBufferSize(sendBufferSize);
+ _socket.setReceiveBufferSize(receiveBufferSize);
+
+ if(LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("SO_RCVBUF : " + _socket.getReceiveBufferSize());
+ LOGGER.debug("SO_SNDBUF : " + _socket.getSendBufferSize());
+ LOGGER.debug("TCP_NODELAY : " + _socket.getTcpNoDelay());
+ }
+
+ InetAddress address = InetAddress.getByName(settings.getHost());
+
+ _socket.connect(new InetSocketAddress(address, settings.getPort()), settings.getConnectTimeout());
+ }
+ catch (SocketException e)
+ {
+ throw new TransportException("Error connecting to broker", e);
+ }
+ catch (IOException e)
+ {
+ throw new TransportException("Error connecting to broker", e);
+ }
+
+ try
+ {
+ IdleTimeoutTicker ticker = new IdleTimeoutTicker(transportActivity, TIMEOUT);
+ _connection = createNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, TIMEOUT, ticker);
+ ticker.setConnection(_connection);
+ _connection.start();
+ }
+ catch(Exception e)
+ {
+ try
+ {
+ _socket.close();
+ }
+ catch(IOException ioe)
+ {
+ //ignored, throw based on original exception
+ }
+
+ throw new TransportException("Error creating network connection", e);
+ }
+
+ return _connection;
+ }
+
+ public void close()
+ {
+ if(_connection != null)
+ {
+ _connection.close();
+ }
+ if(_acceptor != null)
+ {
+ _acceptor.close();
+ }
+ }
+
+ public NetworkConnection getConnection()
+ {
+ return _connection;
+ }
+
+ public void accept(NetworkTransportConfiguration config,
+ ProtocolEngineFactory factory,
+ SSLContext sslContext, final Set<TransportEncryption> encryptionSet)
+ {
+ try
+ {
+ _acceptor = new AcceptingThread(config, factory, sslContext);
+ _acceptor.setDaemon(false);
+ _acceptor.start();
+ }
+ catch (IOException e)
+ {
+ throw new TransportException("Failed to start AMQP on port : " + config, e);
+ }
+ }
+
+ public int getAcceptingPort()
+ {
+ return _acceptor == null ? -1 : _acceptor.getPort();
+ }
+
+ protected abstract NetworkConnection createNetworkConnection(Socket socket,
+ Receiver<ByteBuffer> engine,
+ Integer sendBufferSize,
+ Integer receiveBufferSize,
+ int timeout,
+ IdleTimeoutTicker ticker);
+
+ private class AcceptingThread extends Thread
+ {
+ private volatile boolean _closed = false;
+ private NetworkTransportConfiguration _config;
+ private ProtocolEngineFactory _factory;
+ private SSLContext _sslContext;
+ private ServerSocket _serverSocket;
+ private int _timeout;
+
+ private AcceptingThread(NetworkTransportConfiguration config,
+ ProtocolEngineFactory factory,
+ SSLContext sslContext) throws IOException
+ {
+ _config = config;
+ _factory = factory;
+ _sslContext = sslContext;
+ _timeout = TIMEOUT;
+
+ InetSocketAddress address = config.getAddress();
+
+ if(sslContext == null)
+ {
+ _serverSocket = new ServerSocket();
+ }
+ else
+ {
+ SSLServerSocketFactory socketFactory = _sslContext.getServerSocketFactory();
+ _serverSocket = socketFactory.createServerSocket();
+
+ SSLServerSocket sslServerSocket = (SSLServerSocket) _serverSocket;
+
+ SSLUtil.removeSSLv3Support(sslServerSocket);
+
+ if(config.needClientAuth())
+ {
+ sslServerSocket.setNeedClientAuth(true);
+ }
+ else if(config.wantClientAuth())
+ {
+ sslServerSocket.setWantClientAuth(true);
+ }
+
+ }
+
+ _serverSocket.setReuseAddress(true);
+ _serverSocket.bind(address);
+ }
+
+
+ /**
+ Close the underlying ServerSocket if it has not already been closed.
+ */
+ public void close()
+ {
+ LOGGER.debug("Shutting down the Acceptor");
+ _closed = true;
+
+ if (!_serverSocket.isClosed())
+ {
+ try
+ {
+ _serverSocket.close();
+ }
+ catch (IOException e)
+ {
+ throw new TransportException(e);
+ }
+ }
+ }
+
+ private int getPort()
+ {
+ return _serverSocket.getLocalPort();
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ while (!_closed)
+ {
+ Socket socket = null;
+ try
+ {
+ socket = _serverSocket.accept();
+
+ ProtocolEngine engine = _factory.newProtocolEngine(socket.getRemoteSocketAddress());
+
+ if(engine != null)
+ {
+ socket.setTcpNoDelay(_config.getTcpNoDelay());
+ socket.setSoTimeout(1000 * HANSHAKE_TIMEOUT);
+
+ final Integer sendBufferSize = _config.getSendBufferSize();
+ final Integer receiveBufferSize = _config.getReceiveBufferSize();
+
+ socket.setSendBufferSize(sendBufferSize);
+ socket.setReceiveBufferSize(receiveBufferSize);
+
+
+ final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT);
+
+ NetworkConnection connection =
+ createNetworkConnection(socket,
+ engine,
+ sendBufferSize,
+ receiveBufferSize,
+ _timeout,
+ ticker);
+
+ connection.setMaxReadIdle(HANSHAKE_TIMEOUT);
+
+ ticker.setConnection(connection);
+
+ engine.setNetworkConnection(connection, connection.getSender());
+
+ connection.start();
+ }
+ else
+ {
+ socket.close();
+ }
+ }
+ catch(RuntimeException e)
+ {
+ LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e);
+ closeSocketIfNecessary(socket);
+ }
+ catch(IOException e)
+ {
+ if(!_closed)
+ {
+ LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e);
+ closeSocketIfNecessary(socket);
+ try
+ {
+ //Delay to avoid tight spinning the loop during issues such as too many open files
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException ie)
+ {
+ LOGGER.debug("Stopping acceptor due to interrupt request");
+ _closed = true;
+ }
+ }
+ }
+ }
+ }
+ finally
+ {
+ if(LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Acceptor exiting, no new connections will be accepted on address "
+ + _config.getAddress());
+ }
+ }
+ }
+
+ private void closeSocketIfNecessary(final Socket socket)
+ {
+ if(socket != null)
+ {
+ try
+ {
+ socket.close();
+ }
+ catch (IOException e)
+ {
+ LOGGER.debug("Exception while closing socket", e);
+ }
+ }
+ }
+
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
index e5bc9fa..f33f626 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
@@ -20,312 +20,25 @@
*/
package org.apache.qpid.transport.network.io;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
import java.net.Socket;
-import java.net.SocketException;
import java.nio.ByteBuffer;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLServerSocket;
-import javax.net.ssl.SSLServerSocketFactory;
-
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.configuration.CommonProperties;
-import org.apache.qpid.protocol.ProtocolEngine;
-import org.apache.qpid.protocol.ProtocolEngineFactory;
-import org.apache.qpid.transport.ConnectionSettings;
-import org.apache.qpid.transport.NetworkTransportConfiguration;
import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.TransportException;
-import org.apache.qpid.transport.network.IncomingNetworkTransport;
-import org.apache.qpid.transport.network.NetworkConnection;
-import org.apache.qpid.transport.network.OutgoingNetworkTransport;
-import org.apache.qpid.transport.network.TransportActivity;
-import org.apache.qpid.transport.network.security.ssl.SSLUtil;
-public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport
+public class IoNetworkTransport extends AbstractNetworkTransport
{
- private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(IoNetworkTransport.class);
- private static final int TIMEOUT = Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME,
- CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT);
- private static final int HANSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME ,
- CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT);
- private Socket _socket;
- private IoNetworkConnection _connection;
- private AcceptingThread _acceptor;
-
- public NetworkConnection connect(ConnectionSettings settings,
- Receiver<ByteBuffer> delegate,
- TransportActivity transportActivity)
+ @Override
+ protected IoNetworkConnection createNetworkConnection(final Socket socket,
+ final Receiver<ByteBuffer> engine,
+ final Integer sendBufferSize,
+ final Integer receiveBufferSize,
+ final int timeout,
+ final IdleTimeoutTicker ticker)
{
- int sendBufferSize = settings.getWriteBufferSize();
- int receiveBufferSize = settings.getReadBufferSize();
-
- try
- {
- _socket = new Socket();
- _socket.setReuseAddress(true);
- _socket.setTcpNoDelay(settings.isTcpNodelay());
- _socket.setSendBufferSize(sendBufferSize);
- _socket.setReceiveBufferSize(receiveBufferSize);
-
- if(LOGGER.isDebugEnabled())
- {
- LOGGER.debug("SO_RCVBUF : " + _socket.getReceiveBufferSize());
- LOGGER.debug("SO_SNDBUF : " + _socket.getSendBufferSize());
- LOGGER.debug("TCP_NODELAY : " + _socket.getTcpNoDelay());
- }
-
- InetAddress address = InetAddress.getByName(settings.getHost());
-
- _socket.connect(new InetSocketAddress(address, settings.getPort()), settings.getConnectTimeout());
- }
- catch (SocketException e)
- {
- throw new TransportException("Error connecting to broker", e);
- }
- catch (IOException e)
- {
- throw new TransportException("Error connecting to broker", e);
- }
-
- try
- {
- IdleTimeoutTicker ticker = new IdleTimeoutTicker(transportActivity, TIMEOUT);
- _connection = new IoNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, TIMEOUT, ticker);
- ticker.setConnection(_connection);
- _connection.start();
- }
- catch(Exception e)
- {
- try
- {
- _socket.close();
- }
- catch(IOException ioe)
- {
- //ignored, throw based on original exception
- }
-
- throw new TransportException("Error creating network connection", e);
- }
-
- return _connection;
- }
-
- public void close()
- {
- if(_connection != null)
- {
- _connection.close();
- }
- if(_acceptor != null)
- {
- _acceptor.close();
- }
- }
-
- public NetworkConnection getConnection()
- {
- return _connection;
- }
-
- public void accept(NetworkTransportConfiguration config,
- ProtocolEngineFactory factory,
- SSLContext sslContext)
- {
- try
- {
- _acceptor = new AcceptingThread(config, factory, sslContext);
- _acceptor.setName(String.format("IoNetworkAcceptor - %s", config.getAddress()));
- _acceptor.setDaemon(false);
- _acceptor.start();
- }
- catch (IOException e)
- {
- throw new TransportException("Failed to start AMQP on port : " + config, e);
- }
- }
-
- public int getAcceptingPort()
- {
- return _acceptor == null ? -1 : _acceptor.getPort();
- }
-
- private class AcceptingThread extends Thread
- {
- private volatile boolean _closed = false;
- private NetworkTransportConfiguration _config;
- private ProtocolEngineFactory _factory;
- private SSLContext _sslContext;
- private ServerSocket _serverSocket;
- private int _timeout;
-
- private AcceptingThread(NetworkTransportConfiguration config,
- ProtocolEngineFactory factory,
- SSLContext sslContext) throws IOException
- {
- _config = config;
- _factory = factory;
- _sslContext = sslContext;
- _timeout = TIMEOUT;
-
- InetSocketAddress address = config.getAddress();
-
- if(sslContext == null)
- {
- _serverSocket = new ServerSocket();
- }
- else
- {
- SSLServerSocketFactory socketFactory = _sslContext.getServerSocketFactory();
- _serverSocket = socketFactory.createServerSocket();
-
- SSLServerSocket sslServerSocket = (SSLServerSocket) _serverSocket;
-
- SSLUtil.removeSSLv3Support(sslServerSocket);
-
- if(config.needClientAuth())
- {
- sslServerSocket.setNeedClientAuth(true);
- }
- else if(config.wantClientAuth())
- {
- sslServerSocket.setWantClientAuth(true);
- }
-
- }
-
- _serverSocket.setReuseAddress(true);
- _serverSocket.bind(address);
- }
-
-
- /**
- Close the underlying ServerSocket if it has not already been closed.
- */
- public void close()
- {
- LOGGER.debug("Shutting down the Acceptor");
- _closed = true;
-
- if (!_serverSocket.isClosed())
- {
- try
- {
- _serverSocket.close();
- }
- catch (IOException e)
- {
- throw new TransportException(e);
- }
- }
- }
-
- private int getPort()
- {
- return _serverSocket.getLocalPort();
- }
-
- @Override
- public void run()
- {
- try
- {
- while (!_closed)
- {
- Socket socket = null;
- try
- {
- socket = _serverSocket.accept();
-
- ProtocolEngine engine = _factory.newProtocolEngine(socket.getRemoteSocketAddress());
-
- if(engine != null)
- {
- socket.setTcpNoDelay(_config.getTcpNoDelay());
- socket.setSoTimeout(1000 * HANSHAKE_TIMEOUT);
-
- final Integer sendBufferSize = _config.getSendBufferSize();
- final Integer receiveBufferSize = _config.getReceiveBufferSize();
-
- socket.setSendBufferSize(sendBufferSize);
- socket.setReceiveBufferSize(receiveBufferSize);
-
-
- final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT);
- NetworkConnection connection =
- new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, _timeout,
- ticker);
-
- connection.setMaxReadIdle(HANSHAKE_TIMEOUT);
-
- ticker.setConnection(connection);
-
- engine.setNetworkConnection(connection, connection.getSender());
-
- connection.start();
- }
- else
- {
- socket.close();
- }
- }
- catch(RuntimeException e)
- {
- LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e);
- closeSocketIfNecessary(socket);
- }
- catch(IOException e)
- {
- if(!_closed)
- {
- LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e);
- closeSocketIfNecessary(socket);
- try
- {
- //Delay to avoid tight spinning the loop during issues such as too many open files
- Thread.sleep(1000);
- }
- catch (InterruptedException ie)
- {
- LOGGER.debug("Stopping acceptor due to interrupt request");
- _closed = true;
- }
- }
- }
- }
- }
- finally
- {
- if(LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Acceptor exiting, no new connections will be accepted on address " + _config.getAddress());
- }
- }
- }
-
- private void closeSocketIfNecessary(final Socket socket)
- {
- if(socket != null)
- {
- try
- {
- socket.close();
- }
- catch (IOException e)
- {
- LOGGER.debug("Exception while closing socket", e);
- }
- }
- }
-
+ return new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, timeout,
+ ticker);
}
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
index b52b59a..467115c 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
@@ -78,7 +78,7 @@
throw new RuntimeException("Error creating IOReceiver thread",e);
}
receiverThread.setDaemon(true);
- receiverThread.setName(String.format("IoReceiver - %s", socket.getRemoteSocketAddress()));
+ receiverThread.setName(String.format("IoReceiver-%s", socket.getRemoteSocketAddress()));
}
public void initiate()
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
index e06782c..79e9928 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
@@ -88,7 +88,7 @@
}
senderThread.setDaemon(true);
- senderThread.setName(String.format("IoSender - %s", _remoteSocketAddress));
+ senderThread.setName(String.format("IoSender-%s", _remoteSocketAddress));
}
public void initiate()
@@ -316,18 +316,6 @@
}
}
- public void setIdleTimeout(int i)
- {
- try
- {
- socket.setSoTimeout(i);
- }
- catch (Exception e)
- {
- throw new SenderException(e);
- }
- }
-
public void setReceiver(IoReceiver receiver)
{
_receiver = receiver;
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java
new file mode 100644
index 0000000..92cea34
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java
@@ -0,0 +1,134 @@
+/*
+*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network.io;
+
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.security.Principal;
+import java.util.Set;
+
+import javax.net.ssl.SSLContext;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.Ticker;
+import org.apache.qpid.transport.network.TransportEncryption;
+
+public class NonBlockingConnection implements NetworkConnection
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingConnection.class);
+ private final SocketChannel _socket;
+ private final long _timeout;
+ private final NonBlockingSenderReceiver _nonBlockingSenderReceiver;
+ private int _maxReadIdle;
+ private int _maxWriteIdle;
+ private Principal _principal;
+ private boolean _principalChecked;
+ private final Object _lock = new Object();
+
+ public NonBlockingConnection(SocketChannel socket,
+ ServerProtocolEngine delegate,
+ int sendBufferSize,
+ int receiveBufferSize,
+ long timeout,
+ Ticker ticker,
+ final Set<TransportEncryption> encryptionSet,
+ final SSLContext sslContext,
+ final boolean wantClientAuth,
+ final boolean needClientAuth,
+ final Runnable onTransportEncryptionAction)
+ {
+ _socket = socket;
+ _timeout = timeout;
+
+ _nonBlockingSenderReceiver = new NonBlockingSenderReceiver(_socket, delegate, receiveBufferSize, ticker, encryptionSet, sslContext, wantClientAuth, needClientAuth, onTransportEncryptionAction);
+
+ }
+
+ public void start()
+ {
+ _nonBlockingSenderReceiver.initiate();
+ }
+
+ public Sender<ByteBuffer> getSender()
+ {
+ return _nonBlockingSenderReceiver;
+ }
+
+ public void close()
+ {
+ _nonBlockingSenderReceiver.close();
+ }
+
+ public SocketAddress getRemoteAddress()
+ {
+ return _socket.socket().getRemoteSocketAddress();
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return _socket.socket().getLocalSocketAddress();
+ }
+
+ public void setMaxWriteIdle(int sec)
+ {
+ _maxWriteIdle = sec;
+ }
+
+ public void setMaxReadIdle(int sec)
+ {
+ _maxReadIdle = sec;
+ }
+
+ @Override
+ public Principal getPeerPrincipal()
+ {
+ synchronized (_lock)
+ {
+ if(!_principalChecked)
+ {
+
+ _principal = _nonBlockingSenderReceiver.getPeerPrincipal();
+
+ _principalChecked = true;
+ }
+
+ return _principal;
+ }
+ }
+
+ @Override
+ public int getMaxReadIdle()
+ {
+ return _maxReadIdle;
+ }
+
+ @Override
+ public int getMaxWriteIdle()
+ {
+ return _maxWriteIdle;
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
new file mode 100644
index 0000000..80ba7a0
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
@@ -0,0 +1,268 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network.io;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.StandardSocketOptions;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.Set;
+
+import javax.net.ssl.SSLContext;
+
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.configuration.CommonProperties;
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.transport.NetworkTransportConfiguration;
+import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.network.IncomingNetworkTransport;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.TransportEncryption;
+
+public class NonBlockingNetworkTransport implements IncomingNetworkTransport
+{
+
+ private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(AbstractNetworkTransport.class);
+ private static final int TIMEOUT = Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME,
+ CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT);
+ private static final int HANDSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME ,
+ CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT);
+ private AcceptingThread _acceptor;
+
+ protected NonBlockingConnection createNetworkConnection(final SocketChannel socket,
+ final ServerProtocolEngine engine,
+ final Integer sendBufferSize,
+ final Integer receiveBufferSize,
+ final int timeout,
+ final IdleTimeoutTicker ticker,
+ final Set<TransportEncryption> encryptionSet,
+ final SSLContext sslContext,
+ final boolean wantClientAuth,
+ final boolean needClientAuth,
+ final Runnable onTransportEncryptionAction)
+ {
+ return new NonBlockingConnection(socket, engine, sendBufferSize, receiveBufferSize, timeout, ticker, encryptionSet, sslContext, wantClientAuth, needClientAuth, onTransportEncryptionAction);
+ }
+
+ public void close()
+ {
+ if(_acceptor != null)
+ {
+ _acceptor.close();
+ }
+ }
+
+ public void accept(NetworkTransportConfiguration config,
+ ProtocolEngineFactory factory,
+ SSLContext sslContext,
+ final Set<TransportEncryption> encryptionSet)
+ {
+ try
+ {
+ _acceptor = new AcceptingThread(config, factory, sslContext, encryptionSet);
+ _acceptor.setDaemon(false);
+ _acceptor.start();
+ }
+ catch (IOException e)
+ {
+ throw new TransportException("Failed to start AMQP on port : " + config, e);
+ }
+ }
+
+ public int getAcceptingPort()
+ {
+ return _acceptor == null ? -1 : _acceptor.getPort();
+ }
+
+ private class AcceptingThread extends Thread
+ {
+ private final Set<TransportEncryption> _encryptionSet;
+ private volatile boolean _closed = false;
+ private final NetworkTransportConfiguration _config;
+ private final ProtocolEngineFactory _factory;
+ private final SSLContext _sslContext;
+ private final ServerSocketChannel _serverSocket;
+ private int _timeout;
+
+ private AcceptingThread(NetworkTransportConfiguration config,
+ ProtocolEngineFactory factory,
+ SSLContext sslContext,
+ final Set<TransportEncryption> encryptionSet) throws IOException
+ {
+ _config = config;
+ _factory = factory;
+ _sslContext = sslContext;
+ _timeout = TIMEOUT;
+
+ InetSocketAddress address = config.getAddress();
+
+ _serverSocket = ServerSocketChannel.open();
+
+ _serverSocket.setOption(StandardSocketOptions.SO_REUSEADDR, true);
+ _serverSocket.bind(address);
+ _encryptionSet = encryptionSet;
+ }
+
+
+ /**
+ Close the underlying ServerSocket if it has not already been closed.
+ */
+ public void close()
+ {
+ LOGGER.debug("Shutting down the Acceptor");
+ _closed = true;
+
+ if (!_serverSocket.socket().isClosed())
+ {
+ try
+ {
+ _serverSocket.close();
+ }
+ catch (IOException e)
+ {
+ throw new TransportException(e);
+ }
+ }
+ }
+
+ private int getPort()
+ {
+ return _serverSocket.socket().getLocalPort();
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ while (!_closed)
+ {
+ SocketChannel socket = null;
+ try
+ {
+ socket = _serverSocket.accept();
+
+ final ServerProtocolEngine engine =
+ (ServerProtocolEngine) _factory.newProtocolEngine(socket.socket().getRemoteSocketAddress());
+
+ if(engine != null)
+ {
+ socket.setOption(StandardSocketOptions.TCP_NODELAY, _config.getTcpNoDelay());
+ socket.socket().setSoTimeout(1000 * HANDSHAKE_TIMEOUT);
+
+ final Integer sendBufferSize = _config.getSendBufferSize();
+ final Integer receiveBufferSize = _config.getReceiveBufferSize();
+
+ socket.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize);
+ socket.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
+
+
+ final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT);
+
+ NetworkConnection connection =
+ createNetworkConnection(socket,
+ engine,
+ sendBufferSize,
+ receiveBufferSize,
+ _timeout,
+ ticker,
+ _encryptionSet,
+ _sslContext,
+ _config.wantClientAuth(),
+ _config.needClientAuth(),
+ new Runnable()
+ {
+
+ @Override
+ public void run()
+ {
+ engine.encryptedTransport();
+ }
+ });
+
+ engine.setNetworkConnection(connection, connection.getSender());
+ connection.setMaxReadIdle(HANDSHAKE_TIMEOUT);
+
+ ticker.setConnection(connection);
+
+ connection.start();
+ }
+ else
+ {
+ socket.close();
+ }
+ }
+ catch(RuntimeException e)
+ {
+ LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e);
+ closeSocketIfNecessary(socket.socket());
+ }
+ catch(IOException e)
+ {
+ if(!_closed)
+ {
+ LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e);
+ closeSocketIfNecessary(socket.socket());
+ try
+ {
+ //Delay to avoid tight spinning the loop during issues such as too many open files
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException ie)
+ {
+ LOGGER.debug("Stopping acceptor due to interrupt request");
+ _closed = true;
+ }
+ }
+ }
+ }
+ }
+ finally
+ {
+ if(LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Acceptor exiting, no new connections will be accepted on address "
+ + _config.getAddress());
+ }
+ }
+ }
+
+ private void closeSocketIfNecessary(final Socket socket)
+ {
+ if(socket != null)
+ {
+ try
+ {
+ socket.close();
+ }
+ catch (IOException e)
+ {
+ LOGGER.debug("Exception while closing socket", e);
+ }
+ }
+ }
+
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
new file mode 100644
index 0000000..acc2b89
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
@@ -0,0 +1,500 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.transport.network.io;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.security.Principal;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLPeerUnverifiedException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.thread.Threading;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.SenderClosedException;
+import org.apache.qpid.transport.SenderException;
+import org.apache.qpid.transport.network.Ticker;
+import org.apache.qpid.transport.network.TransportEncryption;
+import org.apache.qpid.transport.network.security.ssl.SSLUtil;
+
+public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingSenderReceiver.class);
+ public static final int NUMBER_OF_BYTES_FOR_TLS_CHECK = 6;
+
+ private final SocketChannel _socketChannel;
+ private final Selector _selector;
+
+ private final ConcurrentLinkedQueue<ByteBuffer> _buffers = new ConcurrentLinkedQueue<>();
+ private final List<ByteBuffer> _encryptedOutput = new ArrayList<>();
+
+ private final Thread _ioThread;
+ private final String _remoteSocketAddress;
+ private final AtomicBoolean _closed = new AtomicBoolean(false);
+ private final ServerProtocolEngine _receiver;
+ private final int _receiveBufSize;
+ private final Ticker _ticker;
+ private final Set<TransportEncryption> _encryptionSet;
+ private final SSLContext _sslContext;
+ private final Runnable _onTransportEncryptionAction;
+ private ByteBuffer _netInputBuffer;
+ private SSLEngine _sslEngine;
+
+ private ByteBuffer _currentBuffer;
+
+ private TransportEncryption _transportEncryption;
+ private SSLEngineResult _status;
+
+
+ public NonBlockingSenderReceiver(final SocketChannel socketChannel,
+ ServerProtocolEngine receiver,
+ int receiveBufSize,
+ Ticker ticker,
+ final Set<TransportEncryption> encryptionSet,
+ final SSLContext sslContext,
+ final boolean wantClientAuth,
+ final boolean needClientAuth,
+ final Runnable onTransportEncryptionAction)
+ {
+ _socketChannel = socketChannel;
+ _receiver = receiver;
+ _receiveBufSize = receiveBufSize;
+ _ticker = ticker;
+ _encryptionSet = encryptionSet;
+ _sslContext = sslContext;
+ _onTransportEncryptionAction = onTransportEncryptionAction;
+
+ if(encryptionSet.size() == 1)
+ {
+ _transportEncryption = _encryptionSet.iterator().next();
+ if (_transportEncryption == TransportEncryption.TLS)
+ {
+ onTransportEncryptionAction.run();
+ }
+ }
+
+ if(encryptionSet.contains(TransportEncryption.TLS))
+ {
+ _sslEngine = _sslContext.createSSLEngine();
+ _sslEngine.setUseClientMode(false);
+ SSLUtil.removeSSLv3Support(_sslEngine);
+ if(needClientAuth)
+ {
+ _sslEngine.setNeedClientAuth(true);
+ }
+ else if(wantClientAuth)
+ {
+ _sslEngine.setWantClientAuth(true);
+ }
+ _netInputBuffer = ByteBuffer.allocate(Math.max(_sslEngine.getSession().getPacketBufferSize(), _receiveBufSize * 2));
+ }
+
+ try
+ {
+ _remoteSocketAddress = socketChannel.getRemoteAddress().toString();
+ _socketChannel.configureBlocking(false);
+ _selector = Selector.open();
+ _socketChannel.register(_selector, SelectionKey.OP_READ);
+ }
+ catch (IOException e)
+ {
+ throw new SenderException("Unable to prepare the channel for non-blocking IO", e);
+ }
+ try
+ {
+ //Create but deliberately don't start the thread.
+ _ioThread = Threading.getThreadFactory().createThread(this);
+ }
+ catch(Exception e)
+ {
+ throw new SenderException("Error creating NonBlockingSenderReceiver thread for " + _remoteSocketAddress, e);
+ }
+
+ _ioThread.setDaemon(true);
+ _ioThread.setName(String.format("NonBlockingSenderReceiver-%s", _remoteSocketAddress));
+
+ }
+
+ public void initiate()
+ {
+ _ioThread.start();
+ }
+
+ @Override
+ public void send(final ByteBuffer msg)
+ {
+ if (_closed.get())
+ {
+ throw new SenderClosedException("I/O for thread " + _remoteSocketAddress + " is already closed");
+ }
+ // append to list and do selector wakeup
+ _buffers.add(msg);
+ _selector.wakeup();
+ }
+
+ @Override
+ public void run()
+ {
+ LOGGER.debug("I/O for thread " + _remoteSocketAddress + " started");
+
+
+ while (!_closed.get())
+ {
+
+ try
+ {
+ long currentTime = System.currentTimeMillis();
+ int tick = _ticker.getTimeToNextTick(currentTime);
+ if(tick <= 0)
+ {
+ tick = _ticker.tick(currentTime);
+ }
+
+ _selector.select(tick <= 0 ? 1 : tick);
+ Set<SelectionKey> selectionKeys = _selector.selectedKeys();
+ selectionKeys.clear();
+
+ _receiver.setTransportBlockedForWriting(!doWrite());
+ doRead();
+ boolean fullyWritten = doWrite();
+ _receiver.setTransportBlockedForWriting(!fullyWritten);
+
+ _socketChannel.register(_selector,
+ fullyWritten
+ ? SelectionKey.OP_READ
+ : (SelectionKey.OP_WRITE | SelectionKey.OP_READ));
+
+ }
+ catch (IOException e)
+ {
+ LOGGER.info("Exception performing I/O for thread '" + _remoteSocketAddress + "': " + e);
+ close();
+ }
+ }
+
+ try(Selector selector = _selector; SocketChannel channel = _socketChannel)
+ {
+ while(!doWrite())
+ {
+ }
+
+ _receiver.closed();
+ }
+ catch (IOException e)
+ {
+ LOGGER.info("Exception performing final write/close for thread '" + _remoteSocketAddress + "': " + e);
+ }
+ finally
+ {
+ LOGGER.debug("Shutting down IO thread for " + _remoteSocketAddress);
+ }
+ }
+
+ @Override
+ public void flush()
+ {
+ _selector.wakeup();
+ }
+
+ @Override
+ public void close()
+ {
+ LOGGER.debug("Closing " + _remoteSocketAddress);
+
+ _closed.set(true);
+ _selector.wakeup();
+
+ }
+
+ private boolean doWrite() throws IOException
+ {
+
+ ByteBuffer[] bufArray = new ByteBuffer[_buffers.size()];
+ Iterator<ByteBuffer> bufferIterator = _buffers.iterator();
+ for (int i = 0; i < bufArray.length; i++)
+ {
+ bufArray[i] = bufferIterator.next();
+ }
+
+ int byteBuffersWritten = 0;
+
+ if(_transportEncryption == TransportEncryption.NONE)
+ {
+
+
+ long written = _socketChannel.write(bufArray);
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Written " + written + " bytes");
+ }
+
+ for (ByteBuffer buf : bufArray)
+ {
+ if (buf.remaining() == 0)
+ {
+ byteBuffersWritten++;
+ _buffers.poll();
+ }
+ }
+
+
+ return bufArray.length == byteBuffersWritten;
+ }
+ else if(_transportEncryption == TransportEncryption.TLS)
+ {
+ int remaining = 0;
+
+ do
+ {
+ if(_sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP)
+ {
+ final ByteBuffer netBuffer = ByteBuffer.allocate(_sslEngine.getSession().getPacketBufferSize());
+ _status = _sslEngine.wrap(bufArray, netBuffer);
+ runSSLEngineTasks(_status);
+
+ netBuffer.flip();
+ remaining = netBuffer.remaining();
+ if (remaining != 0)
+ {
+ _encryptedOutput.add(netBuffer);
+ }
+ for (ByteBuffer buf : bufArray)
+ {
+ if (buf.remaining() == 0)
+ {
+ byteBuffersWritten++;
+ _buffers.poll();
+ }
+ }
+ }
+
+ }
+ while(remaining != 0 && _sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP);
+
+ ByteBuffer[] encryptedBuffers = _encryptedOutput.toArray(new ByteBuffer[_encryptedOutput.size()]);
+ long written = _socketChannel.write(encryptedBuffers);
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Written " + written + " encrypted bytes");
+ }
+
+ ListIterator<ByteBuffer> iter = _encryptedOutput.listIterator();
+ while(iter.hasNext())
+ {
+ ByteBuffer buf = iter.next();
+ if(buf.remaining() == 0)
+ {
+ iter.remove();
+ }
+ else
+ {
+ break;
+ }
+ }
+
+ return bufArray.length == byteBuffersWritten;
+
+ }
+ else
+ {
+ return true;
+ }
+ }
+
+ private void doRead() throws IOException
+ {
+
+ if(_transportEncryption == TransportEncryption.NONE)
+ {
+ int remaining = 0;
+ while (remaining == 0 && !_closed.get())
+ {
+ if (_currentBuffer == null || _currentBuffer.remaining() == 0)
+ {
+ _currentBuffer = ByteBuffer.allocate(_receiveBufSize);
+ }
+ int read = _socketChannel.read(_currentBuffer);
+ if (read == -1)
+ {
+ _closed.set(true);
+ }
+ remaining = _currentBuffer.remaining();
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Read " + read + " byte(s)");
+ }
+ ByteBuffer dup = _currentBuffer.duplicate();
+ dup.flip();
+ _currentBuffer = _currentBuffer.slice();
+ _receiver.received(dup);
+ }
+ }
+ else if(_transportEncryption == TransportEncryption.TLS)
+ {
+ int read = 1;
+ while(!_closed.get() && read > 0 && _sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_WRAP && (_status == null || _status.getStatus() != SSLEngineResult.Status.CLOSED))
+ {
+ read = _socketChannel.read(_netInputBuffer);
+ if (read == -1)
+ {
+ _closed.set(true);
+ }
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Read " + read + " encrypted bytes " + _netInputBuffer);
+ }
+ _netInputBuffer.flip();
+
+
+ int unwrapped = 0;
+ do
+ {
+ ByteBuffer appInputBuffer =
+ ByteBuffer.allocate(_sslEngine.getSession().getApplicationBufferSize() + 50);
+
+ _status = _sslEngine.unwrap(_netInputBuffer, appInputBuffer);
+ runSSLEngineTasks(_status);
+
+ appInputBuffer.flip();
+ unwrapped = appInputBuffer.remaining();
+ _receiver.received(appInputBuffer);
+ }
+ while(unwrapped > 0);
+
+ _netInputBuffer.compact();
+
+ }
+ }
+ else
+ {
+ int read = 1;
+ while (!_closed.get() && read > 0)
+ {
+
+ read = _socketChannel.read(_netInputBuffer);
+ if (read == -1)
+ {
+ _closed.set(true);
+ }
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Read " + read + " possibly encrypted bytes " + _netInputBuffer);
+ }
+
+ if (_netInputBuffer.position() >= NUMBER_OF_BYTES_FOR_TLS_CHECK)
+ {
+ _netInputBuffer.flip();
+ final byte[] headerBytes = new byte[NUMBER_OF_BYTES_FOR_TLS_CHECK];
+ ByteBuffer dup = _netInputBuffer.duplicate();
+ dup.get(headerBytes);
+
+ _transportEncryption = looksLikeSSL(headerBytes) ? TransportEncryption.TLS : TransportEncryption.NONE;
+ LOGGER.debug("Identified transport encryption as " + _transportEncryption);
+
+ if (_transportEncryption == TransportEncryption.NONE)
+ {
+ _receiver.received(_netInputBuffer);
+ }
+ else
+ {
+ _onTransportEncryptionAction.run();
+ _netInputBuffer.compact();
+ doRead();
+ }
+ break;
+ }
+ }
+ }
+ }
+
+ private void runSSLEngineTasks(final SSLEngineResult status)
+ {
+ if(status.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_TASK)
+ {
+ Runnable task;
+ while((task = _sslEngine.getDelegatedTask()) != null)
+ {
+ task.run();
+ }
+ }
+ }
+
+ private boolean looksLikeSSL(byte[] headerBytes)
+ {
+ return looksLikeSSLv3ClientHello(headerBytes) || looksLikeSSLv2ClientHello(headerBytes);
+ }
+
+ private boolean looksLikeSSLv3ClientHello(byte[] headerBytes)
+ {
+ return headerBytes[0] == 22 && // SSL Handshake
+ (headerBytes[1] == 3 && // SSL 3.0 / TLS 1.x
+ (headerBytes[2] == 0 || // SSL 3.0
+ headerBytes[2] == 1 || // TLS 1.0
+ headerBytes[2] == 2 || // TLS 1.1
+ headerBytes[2] == 3)) && // TLS1.2
+ (headerBytes[5] == 1); // client_hello
+ }
+
+ private boolean looksLikeSSLv2ClientHello(byte[] headerBytes)
+ {
+ return headerBytes[0] == -128 &&
+ headerBytes[3] == 3 && // SSL 3.0 / TLS 1.x
+ (headerBytes[4] == 0 || // SSL 3.0
+ headerBytes[4] == 1 || // TLS 1.0
+ headerBytes[4] == 2 || // TLS 1.1
+ headerBytes[4] == 3);
+ }
+
+ public Principal getPeerPrincipal()
+ {
+
+ if (_sslEngine != null)
+ {
+ try
+ {
+ return _sslEngine.getSession().getPeerPrincipal();
+ }
+ catch (SSLPeerUnverifiedException e)
+ {
+ return null;
+ }
+ }
+
+ return null;
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java
index 098f2fb..fa1801c 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java
@@ -103,11 +103,6 @@
}
}
- public void setIdleTimeout(int i)
- {
- delegate.setIdleTimeout(i);
- }
-
public void securityLayerEstablished()
{
appData = new byte[getSendBuffSize()];
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLBufferingSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLBufferingSender.java
index 24f95d7..e69de29 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLBufferingSender.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLBufferingSender.java
@@ -1,274 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.transport.network.security.ssl;
-
-import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicBoolean;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLEngineResult;
-import javax.net.ssl.SSLEngineResult.HandshakeStatus;
-import javax.net.ssl.SSLEngineResult.Status;
-import javax.net.ssl.SSLException;
-import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.SenderException;
-import org.apache.qpid.transport.network.security.SSLStatus;
-import org.apache.qpid.transport.util.Logger;
-
-public class SSLBufferingSender implements Sender<ByteBuffer>
-{
- private static final Logger log = Logger.get(SSLBufferingSender.class);
- private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0);
-
- private final Sender<ByteBuffer> delegate;
- private final SSLEngine engine;
- private final int sslBufSize;
- private final ByteBuffer netData;
- private final SSLStatus _sslStatus;
-
- private String _hostname;
-
- private final AtomicBoolean closed = new AtomicBoolean(false);
- private ByteBuffer _appData = EMPTY_BYTE_BUFFER;
-
-
- public SSLBufferingSender(SSLEngine engine, Sender<ByteBuffer> delegate, SSLStatus sslStatus)
- {
- this.engine = engine;
- this.delegate = delegate;
- sslBufSize = engine.getSession().getPacketBufferSize();
- netData = ByteBuffer.allocate(sslBufSize);
- _sslStatus = sslStatus;
- }
-
- public void setHostname(String hostname)
- {
- _hostname = hostname;
- }
-
- public void close()
- {
- if (!closed.getAndSet(true))
- {
- if (engine.isOutboundDone())
- {
- return;
- }
- log.debug("Closing SSL connection");
- doSend();
- engine.closeOutbound();
- try
- {
- tearDownSSLConnection();
- }
- catch(Exception e)
- {
- throw new SenderException("Error closing SSL connection",e);
- }
-
-
- synchronized(_sslStatus.getSslLock())
- {
- while (!engine.isOutboundDone())
- {
- try
- {
- _sslStatus.getSslLock().wait();
- }
- catch(InterruptedException e)
- {
- // pass
- }
-
- }
- }
- delegate.close();
- }
- }
-
- private void tearDownSSLConnection() throws Exception
- {
- SSLEngineResult result = engine.wrap(ByteBuffer.allocate(0), netData);
- Status status = result.getStatus();
- int read = result.bytesProduced();
- while (status != Status.CLOSED)
- {
- if (status == Status.BUFFER_OVERFLOW)
- {
- netData.clear();
- }
- if(read > 0)
- {
- int limit = netData.limit();
- netData.limit(netData.position());
- netData.position(netData.position() - read);
-
- ByteBuffer data = netData.slice();
-
- netData.limit(limit);
- netData.position(netData.position() + read);
-
- delegate.send(data);
- flush();
- }
- result = engine.wrap(ByteBuffer.allocate(0), netData);
- status = result.getStatus();
- read = result.bytesProduced();
- }
- }
-
- public void flush()
- {
- delegate.flush();
- }
-
- public void send()
- {
- if(!closed.get())
- {
- doSend();
- }
- }
-
- public synchronized void send(ByteBuffer appData)
- {
- boolean buffered;
- if(buffered = _appData.hasRemaining())
- {
- ByteBuffer newBuf = ByteBuffer.allocate(_appData.remaining()+appData.remaining());
- newBuf.put(_appData);
- newBuf.put(appData);
- newBuf.flip();
- _appData = newBuf;
- }
- if (closed.get())
- {
- throw new SenderException("SSL Sender is closed");
- }
- doSend();
- if(!appData.hasRemaining())
- {
- _appData = EMPTY_BYTE_BUFFER;
- }
- else if(!buffered)
- {
- _appData = ByteBuffer.allocate(appData.remaining());
- _appData.put(appData);
- _appData.flip();
- }
- }
-
- private synchronized void doSend()
- {
-
- HandshakeStatus handshakeStatus;
- Status status;
-
- while((_appData.hasRemaining() || engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP)
- && !_sslStatus.getSslErrorFlag())
- {
- int read = 0;
- try
- {
- SSLEngineResult result = engine.wrap(_appData, netData);
- read = result.bytesProduced();
- status = result.getStatus();
- handshakeStatus = result.getHandshakeStatus();
- }
- catch(SSLException e)
- {
- // Should this set _sslError??
- throw new SenderException("SSL, Error occurred while encrypting data",e);
- }
-
- if(read > 0)
- {
- int limit = netData.limit();
- netData.limit(netData.position());
- netData.position(netData.position() - read);
-
- ByteBuffer data = netData.slice();
-
- netData.limit(limit);
- netData.position(netData.position() + read);
-
- delegate.send(data);
- }
-
- switch(status)
- {
- case CLOSED:
- throw new SenderException("SSLEngine is closed");
-
- case BUFFER_OVERFLOW:
- netData.clear();
- continue;
-
- case OK:
- break; // do nothing
-
- default:
- throw new IllegalStateException("SSLReceiver: Invalid State " + status);
- }
-
- switch (handshakeStatus)
- {
- case NEED_WRAP:
- if (netData.hasRemaining())
- {
- continue;
- }
-
- case NEED_TASK:
- doTasks();
- break;
-
- case NEED_UNWRAP:
- flush();
- return;
-
- case FINISHED:
- if (_hostname != null)
- {
- SSLUtil.verifyHostname(engine, _hostname);
- }
-
- case NOT_HANDSHAKING:
- break; //do nothing
-
- default:
- throw new IllegalStateException("SSLSender: Invalid State " + status);
- }
-
- }
- }
-
- private void doTasks()
- {
- Runnable runnable;
- while ((runnable = engine.getDelegatedTask()) != null) {
- runnable.run();
- }
- }
-
- public void setIdleTimeout(int i)
- {
- delegate.setIdleTimeout(i);
- }
-}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java
index 7c61136..7d64012 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java
@@ -264,8 +264,4 @@
}
}
- public void setIdleTimeout(int i)
- {
- delegate.setIdleTimeout(i);
- }
}
diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java
index 3071594..865a360 100644
--- a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java
+++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java
@@ -22,6 +22,7 @@
import java.nio.ByteBuffer;
+import java.util.Set;
import javax.net.ssl.SSLContext;
@@ -150,7 +151,9 @@
}
public void accept(NetworkTransportConfiguration config,
- ProtocolEngineFactory factory, SSLContext sslContext)
+ ProtocolEngineFactory factory,
+ SSLContext sslContext,
+ final Set<TransportEncryption> encryptionSet)
{
throw new UnsupportedOperationException();
}
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java
index 3025414..cdd5e13 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java
@@ -39,7 +39,7 @@
private Session session;
private AMQQueue queue;
private MessageConsumer consumer;
- private int numMessages;
+ private int _numMessages;
private static final Logger _logger = LoggerFactory.getLogger(AMQQueueDeferredOrderingTest.class);
@@ -86,7 +86,7 @@
{
super.setUp();
- numMessages = isBrokerStorePersistent() ? 300 : 1000;
+ _numMessages = isBrokerStorePersistent() ? 300 : 1000;
_logger.info("Create Connection");
con = getConnection();
@@ -106,30 +106,33 @@
// Setup initial messages
_logger.info("Creating first producer thread");
- producerThread = new ASyncProducer(queue, 0, numMessages / 2);
+ producerThread = new ASyncProducer(queue, 0, _numMessages / 2);
producerThread.start();
// Wait for them to be done
producerThread.join();
// Setup second set of messages to produce while we consume
_logger.info("Creating second producer thread");
- producerThread = new ASyncProducer(queue, numMessages / 2, numMessages);
+ producerThread = new ASyncProducer(queue, _numMessages / 2, _numMessages);
producerThread.start();
// Start consuming and checking they're in order
_logger.info("Consuming messages");
- for (int i = 0; i < numMessages; i++)
+ for (int i = 0; i < _numMessages; i++)
{
Message msg = consumer.receive(3000);
+
+ _logger.debug("KWDEBUG got " + msg);
+
assertNotNull("Message " + i + " should not be null", msg);
assertTrue("Message " + i + " should be a text message", msg instanceof TextMessage);
- assertEquals("Message content " + i + "does not match expected", Integer.toString(i), ((TextMessage) msg).getText());
+ assertEquals("Message content " + i + " does not match expected", Integer.toString(i), ((TextMessage) msg).getText());
}
}
protected void tearDown() throws Exception
{
- _logger.info("Interuptting producer thread");
+ _logger.info("Interrupting producer thread");
producerThread.interrupt();
_logger.info("Closing connection");
con.close();
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
index 007772e..9a8d021 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
@@ -161,7 +161,7 @@
when(port.getContextValue(eq(Long.class),eq(Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY))).thenReturn(10000l);
MultiVersionProtocolEngineFactory factory =
- new MultiVersionProtocolEngineFactory(_broker, null, false, false, protocols, null, port,
+ new MultiVersionProtocolEngineFactory(_broker, protocols, null, port,
org.apache.qpid.server.model.Transport.TCP);
//create a dummy to retrieve the 'current' ID number
@@ -215,7 +215,7 @@
try
{
- new MultiVersionProtocolEngineFactory(_broker, null, false, false, versions, Protocol.AMQP_0_9, null,
+ new MultiVersionProtocolEngineFactory(_broker, versions, Protocol.AMQP_0_9, null,
org.apache.qpid.server.model.Transport.TCP);
fail("should not have been allowed to create the factory");
}
@@ -236,10 +236,6 @@
{
_sender = new Sender<ByteBuffer>()
{
- public void setIdleTimeout(int i)
- {
- }
-
public void send(ByteBuffer msg)
{
}
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
index 3ffa73b..347bf93 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
@@ -114,11 +114,6 @@
_sender = new Sender<ByteBuffer>()
{
- public void setIdleTimeout(int i)
- {
-
- }
-
public void send(ByteBuffer msg)
{
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java
index f762038..5928a41 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java
@@ -313,14 +313,10 @@
_maxFrameSize,
actualHeartbeatInterval);
- int idleTimeout = (int)(actualHeartbeatInterval * 1000 * heartbeatTimeoutFactor);
conn.getNetworkConnection().setMaxReadIdle((int)(actualHeartbeatInterval*heartbeatTimeoutFactor));
conn.getNetworkConnection().setMaxWriteIdle(actualHeartbeatInterval);
conn.setMaxFrameSize(_maxFrameSize);
-
- conn.setIdleTimeout(idleTimeout);
-
int channelMax = tune.getChannelMax();
conn.setChannelMax(channelMax == 0 ? Connection.MAX_CHANNEL_MAX : channelMax);
diff --git a/qpid/java/test-profiles/JavaExcludes b/qpid/java/test-profiles/JavaExcludes
index cafb1d5..b52987f 100644
--- a/qpid/java/test-profiles/JavaExcludes
+++ b/qpid/java/test-profiles/JavaExcludes
@@ -26,3 +26,4 @@
//QPID-4153 Messages causing a runtime selector error should be dead-lettered (or something similar)
org.apache.qpid.test.client.message.SelectorTest#testRuntimeSelectorError
+