QPID-7633: [Java Broker] address review comments
* remove redundant declaration of methods from AbstractAMQPSession
* pass the receiving channels to the Session_1_0 ctor and remove Session_1_0#setReceivingChannel and Session_1_0#setSendingChannel
* get rid of uses of AMQPConnection_1_0Impl in favour of AMQPConnection_1_0
* rename sessionModel to session in various places
* rename channel to session in AbstractQueue
diff --git a/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java b/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
index b913508..915e1d9 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
@@ -37,7 +37,6 @@
import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.message.MessageContainer;
import org.apache.qpid.server.queue.SuspendedConsumerLoggingTicker;
-import org.apache.qpid.server.session.AMQPSession;
import org.apache.qpid.server.transport.AMQPConnection;
public abstract class AbstractConsumerTarget<T extends AbstractConsumerTarget<T>> implements ConsumerTarget<T>
@@ -97,7 +96,7 @@
{
@SuppressWarnings("unchecked")
final T target = (T) this;
- getSessionModel().notifyWork(target);
+ getSession().notifyWork(target);
}
protected final void setNotifyWorkDesired(final boolean desired)
@@ -108,12 +107,12 @@
{
if (desired)
{
- getSessionModel().removeTicker(_suspendedConsumerLoggingTicker);
+ getSession().removeTicker(_suspendedConsumerLoggingTicker);
}
else
{
_suspendedConsumerLoggingTicker.setStartTime(System.currentTimeMillis());
- getSessionModel().addTicker(_suspendedConsumerLoggingTicker);
+ getSession().addTicker(_suspendedConsumerLoggingTicker);
}
}
@@ -134,7 +133,7 @@
@Override
public boolean processPending()
{
- if (!getSessionModel().getAMQPConnection().isIOThread())
+ if (!getSession().getAMQPConnection().isIOThread())
{
return false;
}
@@ -172,7 +171,7 @@
private ListenableFuture<Void> doOnIoThreadAsync(final Runnable task)
{
- return getSessionModel().getAMQPConnection().doOnIOThreadAsync(task);
+ return getSession().getAMQPConnection().doOnIOThreadAsync(task);
}
private void consumerRemovedInternal(final MessageInstanceConsumer sub)
@@ -279,7 +278,7 @@
}
if (_suspendedConsumerLoggingTicker != null)
{
- getSessionModel().removeTicker(_suspendedConsumerLoggingTicker);
+ getSession().removeTicker(_suspendedConsumerLoggingTicker);
}
return true;
diff --git a/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java b/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
index b2b6320..1fa7fb5 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
@@ -58,7 +58,7 @@
long getUnacknowledgedMessages();
- AMQPSession<?,T> getSessionModel();
+ AMQPSession<?,T> getSession();
void send(final MessageInstanceConsumer<T> consumer, MessageInstance entry, boolean batch);
diff --git a/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java b/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
index 45fb744..5ef7c37 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
@@ -138,7 +138,7 @@
final Collection<QueueConsumer<?,?>> consumers = _queue.getConsumers();
for(QueueConsumer<?,?> c : consumers)
{
- if(c.getSessionModel().getConnectionReference() == message.getConnectionReference())
+ if(c.getSession().getConnectionReference() == message.getConnectionReference())
{
return false;
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java b/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java
index 3f43f01..2302665 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java
@@ -80,7 +80,7 @@
long getUnacknowledgedMessages();
- AMQPSession<?,?> getSessionModel();
+ AMQPSession<?,?> getSession();
long getConsumerNumber();
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 6b04aac..39e309e 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -208,7 +208,8 @@
private AtomicBoolean _stopped = new AtomicBoolean(false);
- private final Set<AMQPSession<?,?>> _blockedChannels = Collections.newSetFromMap(new ConcurrentHashMap<AMQPSession<?,?>, Boolean>());
+ private final Set<AMQPSession<?, ?>> _blockedSessions =
+ Collections.newSetFromMap(new ConcurrentHashMap<AMQPSession<?, ?>, Boolean>());
private final AtomicBoolean _deleted = new AtomicBoolean(false);
private final SettableFuture<Integer> _deleteFuture = SettableFuture.create();
@@ -357,36 +358,36 @@
_queueHouseKeepingTask = new AdvanceConsumersTask();
Subject activeSubject = Subject.getSubject(AccessController.getContext());
Set<SessionPrincipal> sessionPrincipals = activeSubject == null ? Collections.<SessionPrincipal>emptySet() : activeSubject.getPrincipals(SessionPrincipal.class);
- AMQPSession<?, ?> sessionModel;
+ AMQPSession<?, ?> session;
if(sessionPrincipals.isEmpty())
{
- sessionModel = null;
+ session = null;
}
else
{
final SessionPrincipal sessionPrincipal = sessionPrincipals.iterator().next();
- sessionModel = sessionPrincipal.getSession();
+ session = sessionPrincipal.getSession();
}
- if(sessionModel != null)
+ if(session != null)
{
switch(_exclusive)
{
case PRINCIPAL:
- _exclusiveOwner = sessionModel.getAMQPConnection().getAuthorizedPrincipal();
+ _exclusiveOwner = session.getAMQPConnection().getAuthorizedPrincipal();
break;
case CONTAINER:
- _exclusiveOwner = sessionModel.getAMQPConnection().getRemoteContainerName();
+ _exclusiveOwner = session.getAMQPConnection().getRemoteContainerName();
break;
case CONNECTION:
- _exclusiveOwner = sessionModel.getAMQPConnection();
- addExclusivityConstraint(sessionModel.getAMQPConnection());
+ _exclusiveOwner = session.getAMQPConnection();
+ addExclusivityConstraint(session.getAMQPConnection());
break;
case SESSION:
- _exclusiveOwner = sessionModel;
- addExclusivityConstraint(sessionModel);
+ _exclusiveOwner = session;
+ addExclusivityConstraint(session);
break;
case NONE:
case LINK:
@@ -427,9 +428,9 @@
if(getLifetimePolicy() == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE)
{
- if(sessionModel != null)
+ if(session != null)
{
- addLifetimeConstraint(sessionModel.getAMQPConnection());
+ addLifetimeConstraint(session.getAMQPConnection());
}
else
{
@@ -440,9 +441,9 @@
}
else if(getLifetimePolicy() == LifetimePolicy.DELETE_ON_SESSION_END)
{
- if(sessionModel != null)
+ if(session != null)
{
- addLifetimeConstraint(sessionModel);
+ addLifetimeConstraint(session);
}
else
{
@@ -790,12 +791,12 @@
case CONNECTION:
if(exclusiveOwner == null)
{
- exclusiveOwner = target.getSessionModel().getAMQPConnection();
- addExclusivityConstraint(target.getSessionModel().getAMQPConnection());
+ exclusiveOwner = target.getSession().getAMQPConnection();
+ addExclusivityConstraint(target.getSession().getAMQPConnection());
}
else
{
- if(exclusiveOwner != target.getSessionModel().getAMQPConnection())
+ if(exclusiveOwner != target.getSession().getAMQPConnection())
{
throw new ConsumerAccessRefused();
}
@@ -804,12 +805,12 @@
case SESSION:
if(exclusiveOwner == null)
{
- exclusiveOwner = target.getSessionModel();
- addExclusivityConstraint(target.getSessionModel());
+ exclusiveOwner = target.getSession();
+ addExclusivityConstraint(target.getSession());
}
else
{
- if(exclusiveOwner != target.getSessionModel())
+ if(exclusiveOwner != target.getSession())
{
throw new ConsumerAccessRefused();
}
@@ -822,7 +823,7 @@
}
break;
case PRINCIPAL:
- Principal currentAuthorizedPrincipal = target.getSessionModel().getAMQPConnection().getAuthorizedPrincipal();
+ Principal currentAuthorizedPrincipal = target.getSession().getAMQPConnection().getAuthorizedPrincipal();
if(exclusiveOwner == null)
{
exclusiveOwner = currentAuthorizedPrincipal;
@@ -838,11 +839,11 @@
case CONTAINER:
if(exclusiveOwner == null)
{
- exclusiveOwner = target.getSessionModel().getAMQPConnection().getRemoteContainerName();
+ exclusiveOwner = target.getSession().getAMQPConnection().getRemoteContainerName();
}
else
{
- if(!exclusiveOwner.equals(target.getSessionModel().getAMQPConnection().getRemoteContainerName()))
+ if(!exclusiveOwner.equals(target.getSession().getAMQPConnection().getRemoteContainerName()))
{
throw new ConsumerAccessRefused();
}
@@ -1734,7 +1735,7 @@
}
@Override
- public void checkCapacity(AMQPSession<?,?> channel)
+ public void checkCapacity(AMQPSession<?,?> session)
{
if(_queueFlowControlSizeBytes != 0L)
{
@@ -1745,9 +1746,9 @@
getEventLogger().message(_logSubject, QueueMessages.OVERFULL(_queueStatistics.getQueueSize(),
_queueFlowControlSizeBytes));
- _blockedChannels.add(channel);
+ _blockedSessions.add(session);
- channel.block(this);
+ session.block(this);
if(_queueStatistics.getQueueSize() <= _queueFlowResumeSizeBytes)
{
@@ -1756,8 +1757,8 @@
getEventLogger().message(_logSubject,
QueueMessages.UNDERFULL(_queueStatistics.getQueueSize(), _queueFlowResumeSizeBytes));
- channel.unblock(this);
- _blockedChannels.remove(channel);
+ session.unblock(this);
+ _blockedSessions.remove(session);
}
}
@@ -1784,10 +1785,10 @@
_queueFlowResumeSizeBytes));
}
- for (final AMQPSession<?,?> blockedChannel : _blockedChannels)
+ for (final AMQPSession<?,?> blockedSession : _blockedSessions)
{
- blockedChannel.unblock(this);
- _blockedChannels.remove(blockedChannel);
+ blockedSession.unblock(this);
+ _blockedSessions.remove(blockedSession);
}
}
}
@@ -2610,7 +2611,7 @@
allowed = _exclusiveOwner == null || _exclusiveOwner.equals(session.getAMQPConnection().getRemoteContainerName());
break;
case LINK:
- allowed = _exclusiveSubscriber == null || _exclusiveSubscriber.getSessionModel() == session;
+ allowed = _exclusiveSubscriber == null || _exclusiveSubscriber.getSession() == session;
break;
default:
throw new ServerScopedRuntimeException("Unknown exclusivity policy " + _exclusive);
@@ -2691,9 +2692,9 @@
if(session == null)
{
- session = c.getSessionModel();
+ session = c.getSession();
}
- else if(!session.equals(c.getSessionModel()))
+ else if(!session.equals(c.getSession()))
{
throw new ExistingConsumerPreventsExclusive();
}
@@ -2701,7 +2702,7 @@
_exclusiveOwner = session;
break;
case LINK:
- _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getAMQPConnection();
+ _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSession().getAMQPConnection();
}
}
@@ -2719,9 +2720,9 @@
QueueConsumer<?,?> c = queueConsumerIterator.next();
if(con == null)
{
- con = c.getSessionModel().getAMQPConnection();
+ con = c.getSession().getAMQPConnection();
}
- else if(!con.equals(c.getSessionModel().getAMQPConnection()))
+ else if(!con.equals(c.getSession().getAMQPConnection()))
{
throw new ExistingConsumerPreventsExclusive();
}
@@ -2732,7 +2733,7 @@
_exclusiveOwner = _exclusiveOwner == null ? null : ((AMQPSession<?,?>)_exclusiveOwner).getAMQPConnection();
break;
case LINK:
- _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getAMQPConnection();
+ _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSession().getAMQPConnection();
}
}
@@ -2749,9 +2750,9 @@
QueueConsumer<?,?> c = queueConsumerIterator.next();
if(containerID == null)
{
- containerID = c.getSessionModel().getAMQPConnection().getRemoteContainerName();
+ containerID = c.getSession().getAMQPConnection().getRemoteContainerName();
}
- else if(!containerID.equals(c.getSessionModel().getAMQPConnection().getRemoteContainerName()))
+ else if(!containerID.equals(c.getSession().getAMQPConnection().getRemoteContainerName()))
{
throw new ExistingConsumerPreventsExclusive();
}
@@ -2765,7 +2766,7 @@
_exclusiveOwner = _exclusiveOwner == null ? null : ((AMQPSession<?,?>)_exclusiveOwner).getAMQPConnection().getRemoteContainerName();
break;
case LINK:
- _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getAMQPConnection().getRemoteContainerName();
+ _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSession().getAMQPConnection().getRemoteContainerName();
}
}
@@ -2782,10 +2783,10 @@
QueueConsumer<?,?> c = queueConsumerIterator.next();
if(principal == null)
{
- principal = c.getSessionModel().getAMQPConnection().getAuthorizedPrincipal();
+ principal = c.getSession().getAMQPConnection().getAuthorizedPrincipal();
}
else if(!Objects.equals(principal.getName(),
- c.getSessionModel().getAMQPConnection().getAuthorizedPrincipal().getName()))
+ c.getSession().getAMQPConnection().getAuthorizedPrincipal().getName()))
{
throw new ExistingConsumerPreventsExclusive();
}
@@ -2799,7 +2800,7 @@
_exclusiveOwner = _exclusiveOwner == null ? null : ((AMQPSession<?,?>)_exclusiveOwner).getAMQPConnection().getAuthorizedPrincipal();
break;
case LINK:
- _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getAMQPConnection().getAuthorizedPrincipal();
+ _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSession().getAMQPConnection().getAuthorizedPrincipal();
}
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
index 316f661..a5ed736 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
@@ -112,9 +112,9 @@
final Integer priority)
{
super(queue,
- createAttributeMap(target.getSessionModel(), consumerName, filters, optionSet, priority));
+ createAttributeMap(target.getSession(), consumerName, filters, optionSet, priority));
_messageClass = messageClass;
- _sessionReference = target.getSessionModel().getConnectionReference();
+ _sessionReference = target.getSession().getConnectionReference();
_consumerNumber = CONSUMER_NUMBER_GENERATOR.getAndIncrement();
_filters = filters;
_acquires = optionSet.contains(ConsumerOption.ACQUIRES);
@@ -132,7 +132,7 @@
setupLogging();
}
- private static Map<String, Object> createAttributeMap(final AMQPSession<?,?> sessionModel,
+ private static Map<String, Object> createAttributeMap(final AMQPSession<?,?> session,
String linkName,
FilterManager filters,
EnumSet<ConsumerOption> optionSet,
@@ -140,9 +140,9 @@
{
Map<String,Object> attributes = new HashMap<String, Object>();
attributes.put(ID, UUID.randomUUID());
- String name = sessionModel.getAMQPConnection().getConnectionId()
+ String name = session.getAMQPConnection().getConnectionId()
+ "|"
- + sessionModel.getChannelId()
+ + session.getChannelId()
+ "|"
+ linkName;
attributes.put(NAME, name);
@@ -215,9 +215,9 @@
}
@Override
- public AMQPSession<?,?> getSessionModel()
+ public AMQPSession<?,?> getSession()
{
- return _target.getSessionModel();
+ return _target.getSession();
}
@Override
diff --git a/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java b/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
index 39e497e..3b3d76c 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
@@ -22,7 +22,6 @@
import java.security.AccessControlContext;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
@@ -50,7 +49,6 @@
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Connection;
-import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.model.Session;
@@ -172,13 +170,6 @@
return getBlocking();
}
- public abstract boolean getBlocking();
-
- public abstract Collection<Consumer<?,X>> getConsumers();
-
- @Override
- public abstract long getConsumerCount();
-
@Override
public int getLocalTransactionOpen()
{
@@ -192,12 +183,6 @@
return getTxnStart();
}
- public abstract long getTxnRejects();
-
- public abstract long getTxnCommits();
-
- public abstract long getTxnStart();
-
public long getLocalTransactionRollbacks()
{
return getTxnRejects();
@@ -221,8 +206,6 @@
_taskList.remove(task);
}
- public abstract int getUnacknowledgedMessageCount();
-
@Override
public Date getTransactionStartTime()
{
@@ -375,17 +358,11 @@
_connection.getAggregateTicker().removeTicker(ticker);
}
- public abstract void doTimeoutAction(final String idleTransactionTimeoutError);
-
public LogSubject getLogSubject()
{
return _logSubject;
}
- public abstract long getTransactionUpdateTimeLong();
-
- public abstract long getTransactionStartTimeLong();
-
@Override
protected void logOperation(final String operation)
{
@@ -431,8 +408,6 @@
}
}
- public abstract void transportStateChanged();
-
protected abstract void updateBlockedStateIfNecessary();
public abstract boolean isClosing();
diff --git a/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java b/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
index 6319a99..bf86051 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
@@ -85,7 +85,7 @@
return queue;
}
- public AMQPSession getSessionModel()
+ public AMQPSession getSession()
{
return _sessionModel;
}
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index 57c2a7d..47feaf5 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -411,6 +411,11 @@
});
}
+ void acknowledge(final MessageInstanceConsumer consumer, final MessageInstance entry)
+ {
+ _session.acknowledge(consumer, this, entry);
+ }
+
void reject(final MessageInstanceConsumer consumer, final MessageInstance entry)
{
entry.setRedelivered();
@@ -429,7 +434,7 @@
entry.setRedelivered();
}
- if (getSessionModel().isClosing() || !setRedelivered)
+ if (getSession().isClosing() || !setRedelivered)
{
entry.decrementDeliveryCount();
}
@@ -487,7 +492,7 @@
protected EventLogger getEventLogger()
{
- return getSessionModel().getAMQPConnection().getEventLogger();
+ return getSession().getAMQPConnection().getEventLogger();
}
private boolean isMaxDeliveryLimitReached(MessageInstance entry)
@@ -573,16 +578,11 @@
stop();
}
- public Session_0_10 getSessionModel()
+ public Session_0_10 getSession()
{
return _session.getModelObject();
}
- public ServerSession getSession()
- {
- return _session;
- }
-
public boolean isDurable()
{
return false;
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
index 17a0b6b..5617833 100755
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
@@ -47,7 +47,7 @@
public void onAccept()
{
- _target.getSession().acknowledge(_consumer, _target, _entry);
+ _target.acknowledge(_consumer, _entry);
}
public void onRelease(boolean setRedelivered)
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
index d0941bf..8320bd7 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
@@ -328,7 +328,7 @@
return _targetAddress;
}
- public AMQChannel getSessionModel()
+ public AMQChannel getSession()
{
return _channel;
}
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
index 109886d..d149783 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
@@ -27,6 +27,7 @@
import org.apache.qpid.server.model.ManagedObject;
import org.apache.qpid.server.protocol.v1_0.codec.SectionDecoderRegistry;
import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
import org.apache.qpid.server.protocol.v1_0.type.transport.End;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
@@ -35,8 +36,13 @@
@ManagedObject(category = false, creatable = false, type="AMQP_1_0")
public interface AMQPConnection_1_0<C extends AMQPConnection_1_0<C>> extends AMQPConnection<C>,
- ProtocolEngine, EventLoggerProvider
+ ProtocolEngine,
+ ConnectionHandler,
+ EventLoggerProvider
{
+ Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY");
+ Symbol SHARED_SUBSCRIPTIONS = Symbol.valueOf("SHARED-SUBS");
+
Object getReference();
String getRemoteContainerId();
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
index f9b85e1..d64ac26 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
@@ -116,10 +116,10 @@
import org.apache.qpid.transport.util.Functions;
public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnection_1_0Impl, ConnectionHandler>
- implements FrameOutputHandler, DescribedTypeConstructorRegistry.Source,
+ implements FrameOutputHandler,
+ DescribedTypeConstructorRegistry.Source,
ValueWriter.Registry.Source,
SASLEndpoint,
- ConnectionHandler,
AMQPConnection_1_0<AMQPConnection_1_0Impl>
{
@@ -154,9 +154,6 @@
(byte) 0
};
- private static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY");
- public static final Symbol SHARED_SUBSCRIPTIONS = Symbol.valueOf("SHARED-SUBS");
-
private FrameWriter _frameWriter;
private ProtocolHandler _frameHandler;
private volatile boolean _transportBlockedForWriting;
@@ -570,16 +567,16 @@
}
- public void receiveBegin(final short channel, final Begin begin)
+ public void receiveBegin(final short receivingChannelId, final Begin begin)
{
assertState(FrameReceivingState.ANY_FRAME);
- short myChannelId;
+ short sendingChannelId;
if (begin.getRemoteChannel() != null)
{
closeConnection(ConnectionError.FRAMING_ERROR,
"BEGIN received on channel "
- + channel
+ + receivingChannelId
+ " with given remote-channel "
+ begin.getRemoteChannel()
+ ". Since the broker does not spontaneously start channels, this must be an error.");
@@ -588,40 +585,37 @@
else // Peer requesting session creation
{
- if (_receivingSessions[channel] == null)
+ if (_receivingSessions[receivingChannelId] == null)
{
- myChannelId = getFirstFreeChannel();
- if (myChannelId == -1)
+ sendingChannelId = getFirstFreeChannel();
+ if (sendingChannelId == -1)
{
closeConnection(ConnectionError.FRAMING_ERROR,
"BEGIN received on channel "
- + channel
+ + receivingChannelId
+ ". There are no free channels for the broker to respond on.");
}
- Session_1_0 session = new Session_1_0(this, begin, myChannelId);
+ Session_1_0 session = new Session_1_0(this, begin, sendingChannelId, receivingChannelId);
session.create();
- _receivingSessions[channel] = session;
- _sendingSessions[myChannelId] = session;
+ _receivingSessions[receivingChannelId] = session;
+ _sendingSessions[sendingChannelId] = session;
Begin beginToSend = new Begin();
-
- session.setReceivingChannel(channel);
- session.setSendingChannel(myChannelId);
- beginToSend.setRemoteChannel(UnsignedShort.valueOf(channel));
+ beginToSend.setRemoteChannel(UnsignedShort.valueOf(receivingChannelId));
beginToSend.setNextOutgoingId(session.getNextOutgoingId());
beginToSend.setOutgoingWindow(session.getOutgoingWindowSize());
beginToSend.setIncomingWindow(session.getIncomingWindowSize());
- sendFrame(myChannelId, beginToSend);
+ sendFrame(sendingChannelId, beginToSend);
_sessions.add(session);
}
else
{
closeConnection(ConnectionError.FRAMING_ERROR,
- "BEGIN received on channel " + channel + " which is already in use.");
+ "BEGIN received on channel " + receivingChannelId + " which is already in use.");
}
}
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index abd55bb..4da1af4 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -315,6 +315,7 @@
}
}
+ @Override
public Session_1_0 getSession()
{
return _link.getSession();
@@ -494,12 +495,6 @@
}
@Override
- public Session_1_0 getSessionModel()
- {
- return getSession();
- }
-
- @Override
public void acquisitionRemoved(final MessageInstance node)
{
}
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index c85c582..f7f8ab6 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -122,7 +122,6 @@
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
-import org.apache.qpid.transport.network.Ticker;
public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget_1_0>
implements LogSubject, org.apache.qpid.server.util.Deletable<Session_1_0>
@@ -150,7 +149,6 @@
private final Map<String, ReceivingLinkEndpoint> _receivingLinkMap = new HashMap<>();
private final Map<LinkEndpoint, UnsignedInteger> _localLinkEndpoints = new HashMap<>();
private final Map<UnsignedInteger, LinkEndpoint> _remoteLinkEndpoints = new HashMap<>();
- private long _lastAttachedTime;
private short _receivingChannel;
private final short _sendingChannel;
@@ -189,34 +187,28 @@
private volatile long _rolledBackTransactions;
private volatile int _unacknowledgedMessages;
- public Session_1_0(final AMQPConnection_1_0 connection, Begin begin, short sendingChannelId)
+ public Session_1_0(final AMQPConnection_1_0 connection,
+ Begin begin,
+ short sendingChannelId,
+ short receivingChannelId)
{
super(connection, sendingChannelId);
_sendingChannel = sendingChannelId;
- _sessionState = SessionState.BEGIN_RECVD;
+ _receivingChannel = receivingChannelId;
+ _sessionState = SessionState.ACTIVE;
_nextIncomingTransferId = new SequenceNumber(begin.getNextOutgoingId().intValue());
_connection = connection;
_primaryDomain = getPrimaryDomain();
- }
- public void setReceivingChannel(final short receivingChannel)
- {
- _receivingChannel = receivingChannel;
- switch(_sessionState)
+ AccessController.doPrivileged((new PrivilegedAction<Object>()
{
- case INACTIVE:
- _sessionState = SessionState.BEGIN_RECVD;
- break;
- case BEGIN_SENT:
- _sessionState = SessionState.ACTIVE;
- break;
- case END_PIPE:
- _sessionState = SessionState.END_SENT;
- break;
- default:
- // TODO error
-
- }
+ @Override
+ public Object run()
+ {
+ _connection.getEventLogger().message(ChannelMessages.CREATE());
+ return null;
+ }
+ }), _accessControllerContext);
}
public void sendDetach(final Detach detach)
@@ -545,33 +537,6 @@
sendFlow(new Flow());
}
- public void setSendingChannel(final short sendingChannel)
- {
- switch(_sessionState)
- {
- case INACTIVE:
- _sessionState = SessionState.BEGIN_SENT;
- break;
- case BEGIN_RECVD:
- _sessionState = SessionState.ACTIVE;
- break;
- default:
- // TODO error
-
- }
-
- AccessController.doPrivileged((new PrivilegedAction<Object>()
- {
- @Override
- public Object run()
- {
- _connection.getEventLogger().message(ChannelMessages.CREATE());
-
- return null;
- }
- }), _accessControllerContext);
- }
-
public void sendFlow(final Flow flow)
{
if(_nextIncomingTransferId != null)
@@ -761,7 +726,7 @@
link = createSendingLink(endpoint, attach);
if (link != null)
{
- capabilities.add(AMQPConnection_1_0Impl.SHARED_SUBSCRIPTIONS);
+ capabilities.add(AMQPConnection_1_0.SHARED_SUBSCRIPTIONS);
}
}
else if (endpoint.getTarget() instanceof Coordinator)
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
index 5217c24..7406dcd 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
@@ -26,7 +26,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.server.protocol.v1_0.AMQPConnection_1_0Impl;
+import org.apache.qpid.server.protocol.v1_0.AMQPConnection_1_0;
import org.apache.qpid.server.protocol.v1_0.ConnectionHandler;
import org.apache.qpid.server.protocol.v1_0.codec.ProtocolHandler;
import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
@@ -53,7 +53,7 @@
_isSasl = isSasl;
}
- public FrameHandler(final AMQPConnection_1_0Impl connection, final boolean sasl)
+ public FrameHandler(final AMQPConnection_1_0 connection, final boolean sasl)
{
this(new ValueHandler(connection.getDescribedTypeRegistry()), connection, sasl);
diff --git a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
index 81c0cd8..a1fa32b 100644
--- a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
+++ b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
@@ -684,9 +684,7 @@
{
Begin begin = mock(Begin.class);
when(begin.getNextOutgoingId()).thenReturn(new UnsignedInteger(channelId));
- Session_1_0 session = new Session_1_0(connection, begin, (short) channelId);
- session.setReceivingChannel((short)channelId);
- session.setSendingChannel((short)channelId);
+ Session_1_0 session = new Session_1_0(connection, begin, (short) channelId, (short) channelId);
return session;
}
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
index 38f01d7..c4f5515 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
@@ -1007,7 +1007,7 @@
AMQPSession<?,?> publishingSession = sessionPrincipals.iterator().next().getSession();
for (ManagementNodeConsumer candidate : _consumers)
{
- if (candidate.getTarget().getTargetAddress().equals(replyTo) && candidate.getSessionModel().getConnectionReference() == publishingSession.getConnectionReference())
+ if (candidate.getTarget().getTargetAddress().equals(replyTo) && candidate.getSession().getConnectionReference() == publishingSession.getConnectionReference())
{
consumer = candidate;
break;
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
index 1119dbc..b6bcb64 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
@@ -106,9 +106,9 @@
}
}
- AMQPSession<?,?> getSessionModel()
+ AMQPSession<?,?> getSession()
{
- return _target.getSessionModel();
+ return _target.getSession();
}
@Override
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java
index a0151d2..7df8887 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java
@@ -273,9 +273,9 @@
}
@Override
- public AMQPSession getSessionModel()
+ public AMQPSession getSession()
{
- return _underlying.getSessionModel();
+ return _underlying.getSession();
}
@Override