QPID-7633: [Java Broker] Remove SessionAdapter
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Session.java b/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
index 1c2fba8..dbc87be 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
@@ -56,7 +56,7 @@
boolean isProducerFlowBlocked();
- Collection<Consumer> getConsumers();
+ Collection<? extends Consumer> getConsumers();
@ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.COUNT, label = "Consumers")
long getConsumerCount();
diff --git a/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
index 298eece..a9a0bbd 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
@@ -70,7 +70,7 @@
int getChannelId();
- int getConsumerCount();
+ long getConsumerCount();
Collection<Consumer<?,X>> getConsumers();
@@ -87,14 +87,14 @@
*
* @return the time this transaction started or 0 if not in a transaction
*/
- long getTransactionStartTime();
+ long getTransactionStartTimeLong();
/**
* Return the time of the last activity on the current transaction.
*
* @return the time of the last activity or 0 if not in a transaction
*/
- long getTransactionUpdateTime();
+ long getTransactionUpdateTimeLong();
void transportStateChanged();
diff --git a/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java b/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java
new file mode 100644
index 0000000..e0f3a52
--- /dev/null
+++ b/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java
@@ -0,0 +1,35 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.session;
+
+import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.logging.EventLoggerProvider;
+import org.apache.qpid.server.model.Session;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.util.Deletable;
+
+public interface AMQPSession<S extends org.apache.qpid.server.session.AMQPSession<S, X>,
+ X extends ConsumerTarget<X>> extends Session<S>,
+ Deletable<S>,
+ EventLoggerProvider,
+ AMQSessionModel<S, X>
+{
+}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java b/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
similarity index 66%
rename from broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
rename to broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
index 94acef6..5f74109 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.server.model.adapter;
+package org.apache.qpid.server.session;
import java.util.ArrayList;
import java.util.Collection;
@@ -26,80 +26,68 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.UUID;
import com.google.common.base.Supplier;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.logging.EventLoggerProvider;
+import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.model.AbstractConfiguredObject;
+import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.model.Session;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.StateTransition;
-import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.protocol.ConsumerListener;
-import org.apache.qpid.server.transport.AbstractAMQPConnection;
import org.apache.qpid.server.transport.TransactionTimeoutTicker;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
import org.apache.qpid.transport.network.Ticker;
-public final class SessionAdapter extends AbstractConfiguredObject<SessionAdapter> implements Session<SessionAdapter>
+public abstract class AbstractAMQPSession<S extends AbstractAMQPSession<S, X>, X extends ConsumerTarget<X>>
+ extends AbstractConfiguredObject<S>
+ implements AMQPSession<S, X>, EventLoggerProvider
{
private static final String OPEN_TRANSACTION_TIMEOUT_ERROR = "Open transaction timed out";
private static final String IDLE_TRANSACTION_TIMEOUT_ERROR = "Idle transaction timed out";
- // Attributes
- private final AMQSessionModel _session;
private final Action _deleteModelTask;
- private final AbstractAMQPConnection<?,?> _amqpConnection;
+ private final Connection<?> _amqpConnection;
- public SessionAdapter(final AbstractAMQPConnection<?,?> amqpConnection,
- final AMQSessionModel<?,?> session)
+ protected AbstractAMQPSession(final Connection<?> parent,
+ final int sessionId)
{
- super(amqpConnection, createAttributes(session));
- _amqpConnection = amqpConnection;
- _session = session;
- _session.addConsumerListener(new ConsumerListener()
- {
- @Override
- public void consumerAdded(final Consumer<?,?> consumer)
- {
- childAdded(consumer);
- }
-
- @Override
- public void consumerRemoved(final Consumer<?,?> consumer)
- {
- childRemoved(consumer);
-
- }
- });
- session.setModelObject(this);
+ super(parent, createAttributes(sessionId));
+ _amqpConnection = parent;
_deleteModelTask = new Action()
{
@Override
public void performAction(final Object object)
{
- session.removeDeleteTask(this);
+ removeDeleteTask(this);
deleteAsync();
}
};
- session.addDeleteTask(_deleteModelTask);
setState(State.ACTIVE);
}
- private static Map<String, Object> createAttributes(final AMQSessionModel session)
+ @Override
+ protected void onCreate()
+ {
+ super.onCreate();
+ addDeleteTask(_deleteModelTask);
+ }
+
+ private static Map<String, Object> createAttributes(final long sessionId)
{
Map<String, Object> attributes = new HashMap<>();
- attributes.put(ID, UUID.randomUUID());
- attributes.put(NAME, String.valueOf(session.getChannelId()));
+ attributes.put(NAME, sessionId);
attributes.put(DURABLE, false);
attributes.put(LIFETIME_POLICY, LifetimePolicy.DELETE_ON_SESSION_END);
return attributes;
@@ -109,67 +97,67 @@
protected void postResolveChildren()
{
super.postResolveChildren();
- registerTransactionTimeoutTickers(_amqpConnection, _session);
+ registerTransactionTimeoutTickers(_amqpConnection);
}
@Override
- public int getChannelId()
- {
- return _session.getChannelId();
- }
+ public abstract int getChannelId();
@Override
public boolean isProducerFlowBlocked()
{
- return _session.getBlocking();
+ return getBlocking();
}
- public Collection<org.apache.qpid.server.model.Consumer> getConsumers()
- {
- return (Collection<Consumer>) _session.getConsumers();
- }
+ public abstract boolean getBlocking();
+
+ public abstract Collection<Consumer<?,X>> getConsumers();
@Override
- public long getConsumerCount()
+ public abstract long getConsumerCount();
+
+ @Override
+ public int getLocalTransactionOpen()
{
- return _session.getConsumerCount();
+ long open = getTxnStart() - (getTxnCommits() + getTxnRejects());
+ return (open > 0L) ? 1 : 0;
}
@Override
public long getLocalTransactionBegins()
{
- return _session.getTxnStart();
+ return getTxnStart();
}
- @Override
- public int getLocalTransactionOpen()
- {
- long open = _session.getTxnStart() - (_session.getTxnCommits() + _session.getTxnRejects());
- return (open > 0l) ? 1 : 0;
- }
+ public abstract long getTxnRejects();
- @Override
+ public abstract long getTxnCommits();
+
+ public abstract long getTxnStart();
+
public long getLocalTransactionRollbacks()
{
- return _session.getTxnRejects();
+ return getTxnRejects();
}
@Override
public long getUnacknowledgedMessages()
{
- return _session.getUnacknowledgedMessageCount();
+ return getUnacknowledgedMessageCount();
}
+ public abstract int getUnacknowledgedMessageCount();
+
@Override
public Date getTransactionStartTime()
{
- return new Date(_session.getTransactionStartTime());
+ return new Date(getTransactionStartTimeLong());
}
@Override
public Date getTransactionUpdateTime()
{
- return new Date(_session.getTransactionUpdateTime());
+ return new Date(getTransactionUpdateTimeLong());
}
@StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED)
@@ -177,17 +165,19 @@
{
deleted();
setState(State.DELETED);
- _session.removeDeleteTask(_deleteModelTask);
+ removeDeleteTask(_deleteModelTask);
return Futures.immediateFuture(null);
}
- private void registerTransactionTimeoutTickers(AbstractAMQPConnection<?,?> amqpConnection,
- final AMQSessionModel session)
+ @Override
+ public abstract EventLogger getEventLogger();
+
+ private void registerTransactionTimeoutTickers(Connection<?> amqpConnection)
{
NamedAddressSpace addressSpace = amqpConnection.getAddressSpace();
if (addressSpace instanceof QueueManagingVirtualHost)
{
- final EventLogger eventLogger = amqpConnection.getEventLogger();
+ final EventLogger eventLogger = getEventLogger();
final QueueManagingVirtualHost<?> virtualhost = (QueueManagingVirtualHost<?>) addressSpace;
final List<Ticker> tickers = new ArrayList<>(4);
@@ -196,7 +186,7 @@
@Override
public Long get()
{
- return SessionAdapter.this._session.getTransactionStartTime();
+ return getTransactionStartTimeLong();
}
};
final Supplier<Long> transactionUpdateTimeSupplier = new Supplier<Long>()
@@ -204,7 +194,7 @@
@Override
public Long get()
{
- return SessionAdapter.this._session.getTransactionUpdateTime();
+ return getTransactionUpdateTimeLong();
}
};
@@ -221,7 +211,7 @@
@Override
public void performAction(Long age)
{
- eventLogger.message(_session.getLogSubject(), ChannelMessages.OPEN_TXN(age));
+ eventLogger.message(getLogSubject(), ChannelMessages.OPEN_TXN(age));
}
}
));
@@ -236,7 +226,7 @@
@Override
public void performAction(Long age)
{
- _session.doTimeoutAction(OPEN_TRANSACTION_TIMEOUT_ERROR);
+ doTimeoutAction(OPEN_TRANSACTION_TIMEOUT_ERROR);
}
}
));
@@ -251,7 +241,7 @@
@Override
public void performAction(Long age)
{
- eventLogger.message(_session.getLogSubject(), ChannelMessages.IDLE_TXN(age));
+ eventLogger.message(getLogSubject(), ChannelMessages.IDLE_TXN(age));
}
}
));
@@ -266,7 +256,7 @@
@Override
public void performAction(Long age)
{
- _session.doTimeoutAction(IDLE_TRANSACTION_TIMEOUT_ERROR);
+ doTimeoutAction(IDLE_TRANSACTION_TIMEOUT_ERROR);
}
}
));
@@ -274,7 +264,7 @@
for (Ticker ticker : tickers)
{
- session.addTicker(ticker);
+ addTicker(ticker);
}
Action deleteTickerTask = new Action()
@@ -282,21 +272,34 @@
@Override
public void performAction(Object o)
{
- session.removeDeleteTask(this);
+ removeDeleteTask(this);
for (Ticker ticker : tickers)
{
- session.removeTicker(ticker);
+ removeTicker(ticker);
}
}
};
- session.addDeleteTask(deleteTickerTask);
+ addDeleteTask(deleteTickerTask);
}
}
+ public abstract void addTicker(final Ticker ticker);
+
+ public abstract void removeTicker(final Ticker ticker);
+
+ public abstract void doTimeoutAction(final String idleTransactionTimeoutError);
+
+ public abstract LogSubject getLogSubject();
+
+ public abstract long getTransactionUpdateTimeLong();
+
+ public abstract long getTransactionStartTimeLong();
+
@Override
protected void logOperation(final String operation)
{
- _amqpConnection.getEventLogger().message(ChannelMessages.OPERATION(operation));
+ getEventLogger().message(ChannelMessages.OPERATION(operation));
}
+
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java b/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
index 4d7a0c6..fd16010 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
@@ -64,9 +64,7 @@
import org.apache.qpid.server.model.StateTransition;
import org.apache.qpid.server.model.TaskExecutorProvider;
import org.apache.qpid.server.model.Transport;
-import org.apache.qpid.server.model.adapter.SessionAdapter;
import org.apache.qpid.server.model.port.AmqpPort;
-import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.security.auth.sasl.SaslSettings;
import org.apache.qpid.server.stats.StatisticsCounter;
@@ -550,18 +548,6 @@
return _subject;
}
- public void sessionAdded(final AMQSessionModel<?,?> session)
- {
- SessionAdapter adapter = new SessionAdapter(this, session);
- adapter.create();
- childAdded(adapter);
-
- }
-
- public void sessionRemoved(final AMQSessionModel<?,?> session)
- {
- }
-
@Override
public TaskExecutor getChildExecutor()
{
diff --git a/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java b/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
index 93b6c87..c64cd20 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
@@ -347,7 +347,7 @@
}
@Override
- public int getConsumerCount()
+ public long getConsumerCount()
{
return 0;
}
@@ -377,13 +377,13 @@
}
@Override
- public long getTransactionStartTime()
+ public long getTransactionStartTimeLong()
{
return 0;
}
@Override
- public long getTransactionUpdateTime()
+ public long getTransactionUpdateTimeLong()
{
return 0;
}
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
index a5d82d4..88d6814 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
@@ -39,7 +39,6 @@
import java.util.concurrent.atomic.AtomicLong;
import javax.security.auth.Subject;
-import javax.security.sasl.SaslServer;
import org.apache.qpid.protocol.ErrorCodes;
import org.apache.qpid.server.logging.EventLogger;
@@ -326,20 +325,12 @@
public synchronized void registerSession(final Session ssn)
{
super.registerSession(ssn);
- _amqpConnection.sessionAdded((ServerSession) ssn);
if(_blocking)
{
((ServerSession)ssn).block();
}
}
- @Override
- public synchronized void removeSession(final Session ssn)
- {
- _amqpConnection.sessionRemoved((ServerSession) ssn);
- super.removeSession(ssn);
- }
-
public Collection<? extends ServerSession> getSessionModels()
{
return Collections.unmodifiableCollection(getChannels());
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
index 497a8ee..f2b5e30 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
@@ -184,9 +184,10 @@
{
SessionDelegate serverSessionDelegate = new ServerSessionDelegate();
- ServerSession ssn = new ServerSession(conn, serverSessionDelegate, new Binary(atc.getName()), 0);
+ final ServerSession serverSession =
+ new ServerSession(conn, serverSessionDelegate, new Binary(atc.getName()), 0);
- return ssn;
+ return serverSession;
}
@Override
@@ -413,9 +414,10 @@
if(isSessionNameUnique(atc.getName(), conn))
{
-
serverConnection.map(ssn, atc.getChannel());
serverConnection.registerSession(ssn);
+ final Session_0_10 session = new Session_0_10(((ServerConnection) conn).getAmqpConnection(), ssn.getChannelId(), ssn);
+ session.create();
ssn.sendSessionAttached(atc.getName());
ssn.setState(Session.State.OPEN);
}
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index b7e017a..c02a461 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -1076,7 +1076,7 @@
}
@Override
- public int getConsumerCount()
+ public long getConsumerCount()
{
return _subscriptions.values().size();
}
@@ -1113,7 +1113,7 @@
}
@Override
- public long getTransactionStartTime()
+ public long getTransactionStartTimeLong()
{
ServerTransaction serverTransaction = _transaction;
if (serverTransaction.isTransactional())
@@ -1127,7 +1127,7 @@
}
@Override
- public long getTransactionUpdateTime()
+ public long getTransactionUpdateTimeLong()
{
ServerTransaction serverTransaction = _transaction;
if (serverTransaction.isTransactional())
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java
new file mode 100644
index 0000000..791805e
--- /dev/null
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java
@@ -0,0 +1,248 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_10;
+
+import java.util.Collection;
+
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.model.Connection;
+import org.apache.qpid.server.model.Consumer;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.Session;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.ConsumerListener;
+import org.apache.qpid.server.session.AbstractAMQPSession;
+import org.apache.qpid.server.transport.AMQPConnection;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.transport.network.Ticker;
+
+public class Session_0_10 extends AbstractAMQPSession<Session_0_10, ConsumerTarget_0_10>
+ implements AMQSessionModel<Session_0_10, ConsumerTarget_0_10>, LogSubject
+{
+ private final AMQPConnection_0_10 _connection;
+ private final ServerSession _serverSession;
+
+ protected Session_0_10(final Connection<?> parent, final int sessionId, final ServerSession serverSession)
+ {
+ super(parent, sessionId);
+ _connection = (AMQPConnection_0_10) parent;
+ _serverSession = serverSession;
+ }
+
+ @Override
+ public EventLogger getEventLogger()
+ {
+ return getConnection().getEventLogger();
+ }
+
+ @Override
+ public String toLogString()
+ {
+ return _serverSession.toLogString();
+ }
+
+ @Override
+ public AMQPConnection<?> getAMQPConnection()
+ {
+ return _connection;
+ }
+
+ @Override
+ public void block(final Queue<?> queue)
+ {
+ _serverSession.block(queue);
+ }
+
+ @Override
+ public void unblock(final Queue<?> queue)
+ {
+ _serverSession.unblock();
+ }
+
+ @Override
+ public void block()
+ {
+ _serverSession.block();
+ }
+
+ @Override
+ public void unblock()
+ {
+ _serverSession.unblock();
+ }
+
+ @Override
+ public Object getConnectionReference()
+ {
+ return _serverSession.getConnectionReference();
+ }
+
+ @Override
+ public void addConsumerListener(final ConsumerListener listener)
+ {
+ _serverSession.addConsumerListener(listener);
+ }
+
+ @Override
+ public void removeConsumerListener(final ConsumerListener listener)
+ {
+ _serverSession.removeConsumerListener(listener);
+ }
+
+ @Override
+ public void setModelObject(final Session<?> session)
+ {
+ _serverSession.setModelObject(this);
+ }
+
+ @Override
+ public Session<?> getModelObject()
+ {
+ return this;
+ }
+
+ @Override
+ public void transportStateChanged()
+ {
+ _serverSession.transportStateChanged();
+ }
+
+ @Override
+ public boolean processPending()
+ {
+ return _serverSession.processPending();
+ }
+
+ @Override
+ public void notifyWork(final ConsumerTarget_0_10 target)
+ {
+ _serverSession.notifyWork(target);
+ }
+
+ @Override
+ public int compareTo(final AMQSessionModel o)
+ {
+ return getId().compareTo(o.getId());
+ }
+
+ @Override
+ public void addDeleteTask(final Action<? super Session_0_10> task)
+ {
+ _serverSession.addDeleteTask((Action<? super ServerSession>) task);
+ }
+
+ @Override
+ public void removeDeleteTask(final Action<? super Session_0_10> task)
+ {
+ _serverSession.removeDeleteTask((Action<? super ServerSession>) task);
+
+ }
+
+ @Override
+ public int getChannelId()
+ {
+ return _serverSession.getChannelId();
+ }
+
+ @Override
+ public boolean getBlocking()
+ {
+ return _serverSession.getBlocking();
+ }
+
+ @Override
+ public Collection<Consumer<?, ConsumerTarget_0_10>> getConsumers()
+ {
+ return _serverSession.getConsumers();
+ }
+
+ @Override
+ public long getConsumerCount()
+ {
+ return _serverSession.getConsumerCount();
+ }
+
+ @Override
+ public long getTxnRejects()
+ {
+ return _serverSession.getTxnRejects();
+ }
+
+ @Override
+ public long getTxnCommits()
+ {
+ return _serverSession.getTxnCommits();
+ }
+
+ @Override
+ public long getTxnStart()
+ {
+ return _serverSession.getTxnStart();
+ }
+
+ @Override
+ public int getUnacknowledgedMessageCount()
+ {
+ return _serverSession.getUnacknowledgedMessageCount();
+ }
+
+ @Override
+ public void addTicker(final Ticker ticker)
+ {
+ _serverSession.addTicker(ticker);
+ }
+
+ @Override
+ public void removeTicker(final Ticker ticker)
+ {
+ _serverSession.removeTicker(ticker);
+ }
+
+ @Override
+ public void doTimeoutAction(final String idleTransactionTimeoutError)
+ {
+ _serverSession.doTimeoutAction(idleTransactionTimeoutError);
+ }
+
+ @Override
+ public LogSubject getLogSubject()
+ {
+ return _serverSession.getLogSubject();
+ }
+
+ @Override
+ public long getTransactionUpdateTimeLong()
+ {
+ return _serverSession.getTransactionUpdateTimeLong();
+ }
+
+ @Override
+ public long getTransactionStartTimeLong()
+ {
+ return _serverSession.getTransactionStartTimeLong();
+ }
+
+ public AMQPConnection_0_10 getConnection()
+ {
+ return _connection;
+ }
+}
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index e23a463..44f3014 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -91,6 +91,7 @@
import org.apache.qpid.server.protocol.v0_8.UnacknowledgedMessageMap.Visitor;
import org.apache.qpid.server.queue.QueueArgumentsConverter;
import org.apache.qpid.server.security.SecurityToken;
+import org.apache.qpid.server.session.AbstractAMQPSession;
import org.apache.qpid.server.store.MessageHandle;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoredMessage;
@@ -107,7 +108,7 @@
import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
import org.apache.qpid.transport.network.Ticker;
-public class AMQChannel
+public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0_8>
implements AMQSessionModel<AMQChannel, ConsumerTarget_0_8>,
AsyncAutoCommitTransaction.FutureRecorder,
ServerChannelMethodProcessor,
@@ -235,6 +236,7 @@
public AMQChannel(AMQPConnection_0_8 connection, int channelId, final MessageStore messageStore)
{
+ super(connection, channelId);
_creditManager = new Pre0_10CreditManager(0L, 0L,
connection.getContextValue(Long.class, AMQPConnection_0_8.HIGH_PREFETCH_LIMIT),
connection.getContextValue(Long.class, AMQPConnection_0_8.BATCH_LIMIT));
@@ -1369,12 +1371,6 @@
}
@Override
- public UUID getId()
- {
- return _id;
- }
-
- @Override
public AMQPConnection_0_8<?> getAMQPConnection()
{
return _connection;
@@ -1915,7 +1911,7 @@
}
@Override
- public int getConsumerCount()
+ public long getConsumerCount()
{
return _tag2SubscriptionTargetMap.size();
}
@@ -1979,7 +1975,7 @@
}
@Override
- public long getTransactionStartTime()
+ public long getTransactionStartTimeLong()
{
ServerTransaction serverTransaction = _transaction;
if (serverTransaction.isTransactional())
@@ -1993,7 +1989,7 @@
}
@Override
- public long getTransactionUpdateTime()
+ public long getTransactionUpdateTimeLong()
{
ServerTransaction serverTransaction = _transaction;
if (serverTransaction.isTransactional())
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
index 3e19ade..61401e8 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
@@ -409,7 +409,6 @@
synchronized (_channelAddRemoveLock)
{
_channelMap.put(channel.getChannelId(), channel);
- sessionAdded(channel);
if(_blocking)
{
channel.block();
@@ -424,7 +423,6 @@
{
session = _channelMap.remove(channelId);
}
- sessionRemoved(session);
session.dispose();
}
@@ -901,6 +899,7 @@
_logger.debug("Connecting to: {}", virtualHost.getName());
final AMQChannel channel = new AMQChannel(this, channelId, virtualHost.getMessageStore());
+ channel.create();
addChannel(channel);
diff --git a/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
index dcb58ec..2c7267b 100644
--- a/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
+++ b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
@@ -83,7 +83,7 @@
private MessageDestination _messageDestination;
@Override
- protected void setUp() throws Exception
+ public void setUp() throws Exception
{
super.setUp();
@@ -123,6 +123,9 @@
when(_amqConnection.getContextProvider()).thenReturn(_virtualHost);
when(_amqConnection.getContextValue(Long.class, Session.PRODUCER_AUTH_CACHE_TIMEOUT)).thenReturn(Session.PRODUCER_AUTH_CACHE_TIMEOUT_DEFAULT);
when(_amqConnection.getContextValue(Integer.class, Session.PRODUCER_AUTH_CACHE_SIZE)).thenReturn(Session.PRODUCER_AUTH_CACHE_SIZE_DEFAULT);
+ when(_amqConnection.getTaskExecutor()).thenReturn(taskExecutor);
+ when(_amqConnection.getChildExecutor()).thenReturn(taskExecutor);
+ when(_amqConnection.getModel()).thenReturn(BrokerModel.getInstance());
when(_amqConnection.getContextValue(Long.class, AMQPConnection_0_8.BATCH_LIMIT)).thenReturn(AMQPConnection_0_8.DEFAULT_BATCH_LIMIT);
when(_amqConnection.getContextValue(Long.class, AMQPConnection_0_8.HIGH_PREFETCH_LIMIT)).thenReturn(AMQPConnection_0_8.DEFAULT_BATCH_LIMIT);
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
index fbe16ee..d109ce4 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
@@ -450,7 +450,6 @@
if (!_closedOnOpen)
{
_sessions.remove(session);
- sessionRemoved(session);
}
}
@@ -595,10 +594,11 @@
closeConnection(ConnectionError.FRAMING_ERROR,
"BEGIN received on channel "
+ channel
- + ". There are no free channels for the broker to responsd on.");
+ + ". There are no free channels for the broker to respond on.");
}
- Session_1_0 session = new Session_1_0(this, begin);
+ Session_1_0 session = new Session_1_0(this, begin, myChannelId);
+ session.create();
_receivingSessions[channel] = session;
_sendingSessions[myChannelId] = session;
@@ -614,8 +614,6 @@
sendFrame(myChannelId, beginToSend);
_sessions.add(session);
- sessionAdded(session);
-
}
else
{
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
index b0b20e9..b856bce 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
@@ -366,7 +366,7 @@
return;
}
- if (!(getSession().getState() == SessionState.END_RECVD || getSession().isEnded()))
+ if (!(getSession().getSessionState() == SessionState.END_RECVD || getSession().isEnded()))
{
Detach detach = new Detach();
detach.setHandle(getLocalHandle());
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index c4f51b6..cf52a69 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -120,6 +120,7 @@
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
import org.apache.qpid.server.security.SecurityToken;
+import org.apache.qpid.server.session.AbstractAMQPSession;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -129,7 +130,8 @@
import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
import org.apache.qpid.transport.network.Ticker;
-public class Session_1_0 implements AMQSessionModel<Session_1_0, ConsumerTarget_1_0>, LogSubject
+public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget_1_0>
+ implements AMQSessionModel<Session_1_0, ConsumerTarget_1_0>, LogSubject
{
public static final Symbol DELAYED_DELIVERY = Symbol.valueOf("DELAYED_DELIVERY");
private static final Logger _logger = LoggerFactory.getLogger(Session_1_0.class);
@@ -148,7 +150,6 @@
new CopyOnWriteArrayList<Action<? super Session_1_0>>();
private final AMQPConnection_1_0 _connection;
- private UUID _id = UUID.randomUUID();
private AtomicBoolean _closed = new AtomicBoolean();
private final Subject _subject = new Subject();
@@ -156,11 +157,11 @@
private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener();
private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>();
- private Session<?> _modelObject;
+ private Session<?> _modelObject = this;
private final Set<ConsumerTarget_1_0> _consumersWithPendingWork = new ScheduledConsumerTargetSet<>();
private Iterator<ConsumerTarget_1_0> _processPendingIterator;
- private SessionState _state;
+ private SessionState _sessionState;
private final Map<String, SendingLinkEndpoint> _sendingLinkMap = new HashMap<>();
private final Map<String, ReceivingLinkEndpoint> _receivingLinkMap = new HashMap<>();
@@ -169,7 +170,7 @@
private long _lastAttachedTime;
private short _receivingChannel;
- private short _sendingChannel = -1;
+ private final short _sendingChannel;
private final CapacityCheckAction _capacityCheckAction = new CapacityCheckAction();
@@ -205,23 +206,12 @@
private volatile long _rolledBackTransactions;
private volatile int _unacknowledgedMessages;
-
- public Session_1_0(final AMQPConnection_1_0 connection)
+ public Session_1_0(final AMQPConnection_1_0 connection, Begin begin, short channelId)
{
- this(connection, SessionState.INACTIVE, null);
- }
-
- public Session_1_0(final AMQPConnection_1_0 connection, Begin begin)
- {
- this(connection, SessionState.BEGIN_RECVD, new SequenceNumber(begin.getNextOutgoingId().intValue()));
- }
-
-
- private Session_1_0(final AMQPConnection_1_0 connection, SessionState state, SequenceNumber nextIncomingId)
- {
-
- _state = state;
- _nextIncomingTransferId = nextIncomingId;
+ super(connection, channelId);
+ _sendingChannel = channelId;
+ _sessionState = SessionState.BEGIN_RECVD;
+ _nextIncomingTransferId = new SequenceNumber(begin.getNextOutgoingId().intValue());
_connection = connection;
_subject.getPrincipals().addAll(connection.getSubject().getPrincipals());
_subject.getPrincipals().add(new SessionPrincipal(this));
@@ -237,16 +227,16 @@
{
_receivingChannel = receivingChannel;
_logSubject.updateSessionDetails();
- switch(_state)
+ switch(_sessionState)
{
case INACTIVE:
- _state = SessionState.BEGIN_RECVD;
+ _sessionState = SessionState.BEGIN_RECVD;
break;
case BEGIN_SENT:
- _state = SessionState.ACTIVE;
+ _sessionState = SessionState.ACTIVE;
break;
case END_PIPE:
- _state = SessionState.END_SENT;
+ _sessionState = SessionState.END_SENT;
break;
default:
// TODO error
@@ -261,7 +251,7 @@
public void receiveAttach(final Attach attach)
{
- if(_state == SessionState.ACTIVE)
+ if(_sessionState == SessionState.ACTIVE)
{
UnsignedInteger handle = attach.getHandle();
if(_remoteLinkEndpoints.containsKey(handle))
@@ -439,22 +429,22 @@
public boolean isActive()
{
- return _state == SessionState.ACTIVE;
+ return _sessionState == SessionState.ACTIVE;
}
public void receiveEnd(final End end)
{
- switch (_state)
+ switch (_sessionState)
{
case END_SENT:
- _state = SessionState.ENDED;
+ _sessionState = SessionState.ENDED;
break;
case ACTIVE:
detachLinks();
remoteEnd(end);
short sendChannel = _sendingChannel;
_connection.sendEnd(sendChannel, new End(), true);
- _state = SessionState.ENDED;
+ _sessionState = SessionState.ENDED;
break;
default:
sendChannel = _sendingChannel;
@@ -570,9 +560,9 @@
}
- public SessionState getState()
+ public SessionState getSessionState()
{
- return _state;
+ return _sessionState;
}
public void sendFlow()
@@ -582,15 +572,14 @@
public void setSendingChannel(final short sendingChannel)
{
- _sendingChannel = sendingChannel;
_logSubject.updateSessionDetails();
- switch(_state)
+ switch(_sessionState)
{
case INACTIVE:
- _state = SessionState.BEGIN_SENT;
+ _sessionState = SessionState.BEGIN_SENT;
break;
case BEGIN_RECVD:
- _state = SessionState.ACTIVE;
+ _sessionState = SessionState.ACTIVE;
break;
default:
// TODO error
@@ -652,17 +641,17 @@
public void end(final End end)
{
- switch (_state)
+ switch (_sessionState)
{
case BEGIN_SENT:
_connection.sendEnd(_sendingChannel, end, false);
- _state = SessionState.END_PIPE;
+ _sessionState = SessionState.END_PIPE;
break;
case ACTIVE:
detachLinks();
short sendChannel = _sendingChannel;
_connection.sendEnd(sendChannel, end, true);
- _state = SessionState.END_SENT;
+ _sessionState = SessionState.END_SENT;
break;
default:
sendChannel = _sendingChannel;
@@ -773,7 +762,7 @@
boolean isEnded()
{
- return _state == SessionState.ENDED || _connection.isClosed();
+ return _sessionState == SessionState.ENDED || _connection.isClosed();
}
UnsignedInteger getIncomingWindowSize()
@@ -1617,12 +1606,6 @@
}
@Override
- public UUID getId()
- {
- return _id;
- }
-
- @Override
public AMQPConnection<?> getAMQPConnection()
{
return _connection;
@@ -1877,7 +1860,7 @@
}
@Override
- public int getConsumerCount()
+ public long getConsumerCount()
{
return getConsumers().size();
}
@@ -1971,13 +1954,13 @@
}
@Override
- public long getTransactionStartTime()
+ public long getTransactionStartTimeLong()
{
return 0L;
}
@Override
- public long getTransactionUpdateTime()
+ public long getTransactionUpdateTimeLong()
{
return 0L;
}
@@ -1985,7 +1968,7 @@
@Override
public boolean processPending()
{
- if (!getAMQPConnection().isIOThread() || END_STATES.contains(getState()))
+ if (!getAMQPConnection().isIOThread() || END_STATES.contains(getSessionState()))
{
return false;
}