Update Queue implementation to better define lifetime and exclusivity policies

git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-amqp-1-0-management@1569102 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
index 270abef..1556876 100644
--- a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
+++ b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
@@ -563,7 +563,7 @@
         {
             _receivingSessions[channel] = null;
 
-            endpoint.end(end);
+            endpoint.receiveEnd(end);
         }
         else
         {
diff --git a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
index c37c52c..c9212b1 100644
--- a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
+++ b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
@@ -153,19 +153,47 @@
 
     public void end()
     {
-        end(null);
+        end(new End());
     }
 
-    public void end(final End end)
+    public void end(End end)
     {
         synchronized(getLock())
         {
             switch(_state)
             {
                 case BEGIN_SENT:
-                    _connection.sendEnd(getSendingChannel(), new End(), false);
+                    _connection.sendEnd(getSendingChannel(), end, false);
                     _state = SessionState.END_PIPE;
                     break;
+                case ACTIVE:
+                    detachLinks();
+                    short sendChannel = getSendingChannel();
+                    _connection.sendEnd(sendChannel, end, true);
+                    _state = SessionState.END_SENT;
+                    break;
+                default:
+                    sendChannel = getSendingChannel();
+                    End reply = new End();
+                    Error error = new Error();
+                    error.setCondition(AmqpError.ILLEGAL_STATE);
+                    error.setDescription("END called on Session which has not been opened");
+                    reply.setError(error);
+                    _connection.sendEnd(sendChannel, reply, true);
+                    break;
+
+
+            }
+            getLock().notifyAll();
+        }
+    }
+
+    public void receiveEnd(final End end)
+    {
+        synchronized(getLock())
+        {
+            switch(_state)
+            {
                 case END_SENT:
                     _state = SessionState.ENDED;
                     break;
@@ -174,7 +202,7 @@
                     _sessionEventListener.remoteEnd(end);
                     short sendChannel = getSendingChannel();
                     _connection.sendEnd(sendChannel, new End(), true);
-                    _state = end == null ? SessionState.END_SENT : SessionState.ENDED;
+                    _state = SessionState.ENDED;
                     break;
                 default:
                     sendChannel = getSendingChannel();
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java
index 9b1ab70..46f2afd 100644
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java
@@ -568,7 +568,7 @@
         Map<String, Object> attributesMap = new HashMap<String, Object>();
         attributesMap.put(Exchange.NAME, exchangeName);
         attributesMap.put(Exchange.TYPE, exchangeType);
-        attributesMap.put(Exchange.LIFETIME_POLICY, autoDelete ? LifetimePolicy.AUTO_DELETE.name()
+        attributesMap.put(Exchange.LIFETIME_POLICY, autoDelete ? "AUTO_DELETE"
                 : LifetimePolicy.PERMANENT.name());
         String json = _serializer.serialize(attributesMap);
         UpgradeConfiguredObjectRecord configuredObject = new UpgradeConfiguredObjectRecord(Exchange.class.getName(), json);
diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java
index 44f0861..0249400 100644
--- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java
+++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java
@@ -44,6 +44,7 @@
 import org.apache.log4j.Logger;
 import org.apache.qpid.server.model.Binding;
 import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.ExclusivityPolicy;
 import org.apache.qpid.server.model.LifetimePolicy;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.UUIDGenerator;
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java b/java/broker-core/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
index 25466d9..4e9d94c 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
@@ -20,7 +20,9 @@
  */
 package org.apache.qpid.server.configuration;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.Map;
 import org.apache.commons.configuration.CompositeConfiguration;
 import org.apache.commons.configuration.Configuration;
@@ -43,7 +45,6 @@
 
         CompositeConfiguration mungedConf = new CompositeConfiguration();
         mungedConf.addConfiguration(_vHostConfig.getConfig().subset("queues.queue." + escapeTagName(name)));
-        mungedConf.addConfiguration(_vHostConfig.getConfig().subset("queues"));
 
         setConfiguration("virtualhosts.virtualhost.queues.queue", mungedConf);
     }
@@ -85,19 +86,33 @@
         return _vHostConfig;
     }
 
+    private boolean getDefaultedBoolean(String attribute)
+    {
+        final Configuration config = _vHostConfig.getConfig();
+        if(config.containsKey("queues."+attribute))
+        {
+            final boolean defaultValue = config.getBoolean("queues." + attribute);
+            return getBooleanValue(attribute, defaultValue);
+        }
+        else
+        {
+            return getBooleanValue(attribute);
+        }
+    }
+
     public boolean getDurable()
     {
-        return getBooleanValue("durable");
+        return getDefaultedBoolean("boolean");
     }
 
     public boolean getExclusive()
     {
-        return getBooleanValue("exclusive");
+        return getDefaultedBoolean("exclusive");
     }
 
     public boolean getAutoDelete()
     {
-        return getBooleanValue("autodelete");
+        return getDefaultedBoolean("autodelete");
     }
 
     public String getOwner()
@@ -107,17 +122,41 @@
 
     public boolean getPriority()
     {
-        return getBooleanValue("priority");
+        return getDefaultedBoolean("priority");
     }
 
     public int getPriorities()
     {
-        return getIntValue("priorities", -1);
+        final Configuration config = _vHostConfig.getConfig();
+
+        int defaultValue;
+        if(config.containsKey("queues.priorities"))
+        {
+            defaultValue = config.getInt("queues.priorities");
+        }
+        else
+        {
+            defaultValue = -1;
+        }
+        return getIntValue("priorities", defaultValue);
     }
 
     public String getExchange()
     {
-        return getStringValue("exchange", ExchangeDefaults.DEFAULT_EXCHANGE_NAME);
+        final Configuration config = _vHostConfig.getConfig();
+
+        String defaultValue;
+
+        if(config.containsKey("queues.exchange"))
+        {
+            defaultValue = config.getString("queues.exchange");
+        }
+        else
+        {
+            defaultValue = "";
+        }
+
+        return getStringValue("exchange", defaultValue);
     }
 
     public List getRoutingKeys()
@@ -137,37 +176,37 @@
 
     public int getMaximumMessageAge()
     {
-        return getIntValue("maximumMessageAge", _vHostConfig.getMaximumMessageAge());
+        return getIntValue("maximumMessageAge");
     }
 
     public long getMaximumQueueDepth()
     {
-        return getLongValue("maximumQueueDepth", _vHostConfig.getMaximumQueueDepth());
+        return getLongValue("maximumQueueDepth");
     }
 
     public long getMaximumMessageSize()
     {
-        return getLongValue("maximumMessageSize", _vHostConfig.getMaximumMessageSize());
+        return getLongValue("maximumMessageSize");
     }
 
     public long getMaximumMessageCount()
     {
-        return getLongValue("maximumMessageCount", _vHostConfig.getMaximumMessageCount());
+        return getLongValue("maximumMessageCount");
     }
 
     public long getMinimumAlertRepeatGap()
     {
-        return getLongValue("minimumAlertRepeatGap", _vHostConfig.getMinimumAlertRepeatGap());
+        return getLongValue("minimumAlertRepeatGap");
     }
 
     public long getCapacity()
     {
-        return getLongValue("capacity", _vHostConfig.getCapacity());
+        return getLongValue("capacity");
     }
 
     public long getFlowResumeCapacity()
     {
-        return getLongValue("flowResumeCapacity", _vHostConfig.getFlowResumeCapacity());
+        return getLongValue("flowResumeCapacity");
     }
 
     public boolean isLVQ()
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java b/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
index 9870551..ea0faf1 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
@@ -22,6 +22,7 @@
 package org.apache.qpid.server.filter;
 
 import java.lang.ref.WeakReference;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
 import java.util.WeakHashMap;
@@ -29,6 +30,7 @@
 import org.apache.qpid.filter.SelectorParsingException;
 import org.apache.qpid.filter.selector.ParseException;
 import org.apache.qpid.filter.selector.TokenMgrError;
+import org.apache.qpid.server.consumer.Consumer;
 import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.queue.AMQQueue;
@@ -120,19 +122,25 @@
 
     public static final class NoLocalFilter implements MessageFilter
     {
-        private final MessageSource _queue;
+        private final MessageSource<?,?> _queue;
 
-        public NoLocalFilter(MessageSource queue)
+        private NoLocalFilter(MessageSource queue)
         {
             _queue = queue;
         }
 
         public boolean matches(Filterable message)
         {
-            final AMQSessionModel exclusiveOwningSession = _queue.getExclusiveOwningSession();
-            return exclusiveOwningSession == null ||
-                    exclusiveOwningSession.getConnectionReference() != message.getConnectionReference();
 
+            final Collection<? extends Consumer> consumers = _queue.getConsumers();
+            for(Consumer c : consumers)
+            {
+                if(c.getSessionModel().getConnectionReference() == message.getConnectionReference())
+                {
+                    return false;
+                }
+            }
+            return !consumers.isEmpty();
         }
 
         @Override
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java b/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java
index 5b0e34b..dcbb693 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java
@@ -46,7 +46,7 @@
         AMQConnectionModel connection = session.getConnectionModel();
         setLogStringWithFormat(CHANNEL_FORMAT,
                                connection == null ? -1L : connection.getConnectionId(),
-                               (connection == null || connection.getPrincipalAsString() == null) ? "?" : connection.getPrincipalAsString(),
+                               (connection == null || connection.getAuthorizedPrincipal() == null) ? "?" : connection.getAuthorizedPrincipal().getName(),
                                (connection == null || connection.getRemoteAddressString() == null) ? "?" : connection.getRemoteAddressString(),
                                (connection == null || connection.getVirtualHostName() == null) ? "?" : connection.getVirtualHostName(),
                                session.getChannelId());
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java b/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java
index 87c2377..b1b8bc5 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java
@@ -55,7 +55,7 @@
     {
         if (!_upToDate)
         {
-            if (_session.getPrincipalAsString() != null)
+            if (_session.getAuthorizedPrincipal() != null)
             {
                 if (_session.getVirtualHostName() != null)
                 {
@@ -71,7 +71,7 @@
                      */
                     setLogString("[" + MessageFormat.format(CONNECTION_FORMAT,
                                                             _session.getConnectionId(),
-                                                            _session.getPrincipalAsString(),
+                                                            _session.getAuthorizedPrincipal().getName(),
                                                             _session.getRemoteAddressString(),
                                                             _session.getVirtualHostName())
                                  + "] ");
@@ -82,7 +82,7 @@
                 {
                     setLogString("[" + MessageFormat.format(USER_FORMAT,
                                                             _session.getConnectionId(),
-                                                            _session.getPrincipalAsString(),
+                                                            _session.getAuthorizedPrincipal().getName(),
                                                             _session.getRemoteAddressString())
                                  + "] ");
 
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
index b947bc0..33c8e26 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
@@ -36,7 +36,8 @@
     <T extends ConsumerTarget> C addConsumer(T target, FilterManager filters,
                          Class<? extends ServerMessage> messageClass,
                          String consumerName, EnumSet<Consumer.Option> options)
-            throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive, QpidSecurityException;
+            throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive, QpidSecurityException,
+                   ConsumerAccessRefused;
 
     Collection<C> getConsumers();
 
@@ -44,16 +45,10 @@
 
     void removeConsumerRegistrationListener(ConsumerRegistrationListener<S> listener);
 
-    AuthorizationHolder getAuthorizationHolder();
-
-    void setAuthorizationHolder(AuthorizationHolder principalHolder);
-
-    void setExclusiveOwningSession(AMQSessionModel owner);
-
-    AMQSessionModel getExclusiveOwningSession();
-
     boolean isExclusive();
 
+    boolean verifySessionAccess(AMQSessionModel<?,?> session);
+
     interface ConsumerRegistrationListener<Q extends MessageSource<? extends Consumer,?>>
     {
         void consumerAdded(Q source, Consumer consumer);
@@ -76,7 +71,6 @@
 
         public ExistingExclusiveConsumer()
         {
-            super("");
         }
     }
 
@@ -95,7 +89,15 @@
     {
         public ExistingConsumerPreventsExclusive()
         {
-            super("");
         }
     }
+
+    static final class ConsumerAccessRefused extends Exception
+    {
+        public ConsumerAccessRefused()
+        {
+        }
+    }
+
+
 }
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/ExclusivityPolicy.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/ExclusivityPolicy.java
new file mode 100644
index 0000000..a559a69
--- /dev/null
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/ExclusivityPolicy.java
@@ -0,0 +1,31 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model;
+
+public enum ExclusivityPolicy
+{
+    NONE,
+    SESSION,
+    CONNECTION,
+    CONTAINER,
+    PRINCIPAL,
+    LINK
+}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/LifetimePolicy.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/LifetimePolicy.java
index c9006f4..1e45f8f 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/model/LifetimePolicy.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/LifetimePolicy.java
@@ -23,5 +23,9 @@
 public enum LifetimePolicy
 {
     PERMANENT,
-    AUTO_DELETE
+    DELETE_ON_CONNECTION_CLOSE,
+    DELETE_ON_SESSION_END,
+    DELETE_ON_NO_OUTBOUND_LINKS,
+    DELETE_ON_NO_LINKS,
+    IN_USE
 }
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
index f0241f8..dd3e9e4 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
@@ -160,7 +160,7 @@
                             QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES,
                             CONFIG_PATH));
 
-    int CURRENT_CONFIG_VERSION = 3;
+    int CURRENT_CONFIG_VERSION = 4;
 
     //children
     Collection<VirtualHostAlias> getAliases();
@@ -172,9 +172,8 @@
                             LifetimePolicy lifetime, long ttl, String type, Map<String, Object> attributes)
             throws AccessControlException, IllegalArgumentException;
 
-    Queue createQueue(String name, State initialState, boolean durable,
-                      boolean exclusive, LifetimePolicy lifetime, long ttl, Map<String, Object> attributes)
-                    throws AccessControlException, IllegalArgumentException;
+    Queue createQueue(Map<String, Object> attributes)
+            throws AccessControlException, IllegalArgumentException;
 
     Collection<String> getExchangeTypes();
 
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java
index 8847580..265d431 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java
@@ -170,7 +170,7 @@
         }
         else if(LIFETIME_POLICY.equals(name))
         {
-            return _queue.getLifetimePolicy() == LifetimePolicy.AUTO_DELETE || _exchange.getLifetimePolicy() == LifetimePolicy.AUTO_DELETE ? LifetimePolicy.AUTO_DELETE : LifetimePolicy.PERMANENT;
+            return _queue.getLifetimePolicy() != LifetimePolicy.PERMANENT || _exchange.getLifetimePolicy() != LifetimePolicy.PERMANENT ? LifetimePolicy.IN_USE : LifetimePolicy.PERMANENT;
         }
         else if(TIME_TO_LIVE.equals(name))
         {
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
index dda23f1..3cb6493 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.model.adapter;
 
 import java.security.AccessControlException;
+import java.security.Principal;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -191,7 +192,8 @@
         }
         else if(name.equals(PRINCIPAL))
         {
-            return _connection.getPrincipalAsString();
+            final Principal authorizedPrincipal = _connection.getAuthorizedPrincipal();
+            return authorizedPrincipal == null ? null : authorizedPrincipal.getName();
         }
         else if(name.equals(PROPERTIES))
         {
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java
index cf68740..7935077 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java
@@ -130,7 +130,7 @@
         }
         else if(LIFETIME_POLICY.equals(name))
         {
-            return LifetimePolicy.AUTO_DELETE;
+            return LifetimePolicy.DELETE_ON_SESSION_END;
         }
         else if(TIME_TO_LIVE.equals(name))
         {
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java
index f7ecbd3..d7b6b8b 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java
@@ -201,7 +201,7 @@
 
     public LifetimePolicy getLifetimePolicy()
     {
-        return _exchange.isAutoDelete() ? LifetimePolicy.AUTO_DELETE : LifetimePolicy.PERMANENT;
+        return _exchange.isAutoDelete() ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS : LifetimePolicy.PERMANENT;
     }
 
     public LifetimePolicy setLifetimePolicy(final LifetimePolicy expected, final LifetimePolicy desired)
@@ -330,7 +330,7 @@
         }
         else if(LIFETIME_POLICY.equals(name))
         {
-            return _exchange.isAutoDelete() ? LifetimePolicy.AUTO_DELETE : LifetimePolicy.PERMANENT;
+            return _exchange.isAutoDelete() ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS : LifetimePolicy.PERMANENT;
         }
         else if(TIME_TO_LIVE.equals(name))
         {
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
index 733b6ce..5d09cfa 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
@@ -31,15 +31,7 @@
 import org.apache.qpid.server.binding.Binding;
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
 import org.apache.qpid.server.message.MessageSource;
-import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.model.ConfiguredObjectFinder;
-import org.apache.qpid.server.model.Exchange;
-import org.apache.qpid.server.model.IllegalStateTransitionException;
-import org.apache.qpid.server.model.LifetimePolicy;
-import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.model.QueueNotificationListener;
-import org.apache.qpid.server.model.State;
-import org.apache.qpid.server.model.Statistics;
+import org.apache.qpid.server.model.*;
 import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.queue.*;
@@ -62,7 +54,6 @@
         put(QUEUE_FLOW_CONTROL_SIZE_BYTES, Long.class);
         put(QUEUE_FLOW_RESUME_SIZE_BYTES, Long.class);
         put(MAXIMUM_DELIVERY_ATTEMPTS, Integer.class);
-        put(EXCLUSIVE, Boolean.class);
         put(DESCRIPTION, String.class);
     }});
 
@@ -208,7 +199,7 @@
 
     public LifetimePolicy getLifetimePolicy()
     {
-        return _queue.isAutoDelete() ? LifetimePolicy.AUTO_DELETE : LifetimePolicy.PERMANENT;
+        return _queue.getLifetimePolicy();
     }
 
     public LifetimePolicy setLifetimePolicy(final LifetimePolicy expected, final LifetimePolicy desired)
@@ -274,9 +265,33 @@
             }
             else if(EXCLUSIVE.equals(name))
             {
-                Boolean exclusiveFlag = (Boolean) desired;
-                _queue.setExclusive(exclusiveFlag);
+                ExclusivityPolicy desiredPolicy;
+                if(desired == null)
+                {
+                    desiredPolicy = ExclusivityPolicy.NONE;
+                }
+                else if(desired instanceof  ExclusivityPolicy)
+                {
+                    desiredPolicy = (ExclusivityPolicy)desired;
+                }
+                else if (desired instanceof String)
+                {
+                    desiredPolicy = ExclusivityPolicy.valueOf((String)desired);
+                }
+                else
+                {
+                    throw new IllegalArgumentException("Cannot set " + Queue.EXCLUSIVE + " property to type " + desired.getClass().getName());
+                }
+                try
+                {
+                    _queue.setExclusivityPolicy(desiredPolicy);
+                }
+                catch (MessageSource.ExistingConsumerPreventsExclusive existingConsumerPreventsExclusive)
+                {
+                    throw new IllegalArgumentException("Unable to set exclusivity policy to " + desired + " as an existing combinations of consumers prevents this");
+                }
                 return true;
+
             }
             else if(MESSAGE_GROUP_KEY.equals(name))
             {
@@ -376,7 +391,7 @@
         }
         else if(EXCLUSIVE.equals(name))
         {
-            return _queue.isExclusive();
+            return _queue.getAttribute(Queue.EXCLUSIVE);
         }
         else if(MESSAGE_GROUP_KEY.equals(name))
         {
@@ -458,7 +473,7 @@
         }
         else if(LIFETIME_POLICY.equals(name))
         {
-            return _queue.isAutoDelete() ? LifetimePolicy.AUTO_DELETE : LifetimePolicy.PERMANENT;
+            return _queue.getLifetimePolicy();
         }
         else if(NAME.equals(name))
         {
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
index 17ce69d..4cd7432 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
@@ -23,7 +23,6 @@
 import java.io.File;
 import java.lang.reflect.Type;
 import java.security.AccessControlException;
-import java.security.Principal;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -58,18 +57,16 @@
 import org.apache.qpid.server.model.QueueType;
 import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.model.Statistics;
-import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.model.VirtualHostAlias;
 import org.apache.qpid.server.configuration.updater.TaskExecutor;
 import org.apache.qpid.server.plugin.ExchangeType;
 import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.ConflationQueue;
 import org.apache.qpid.server.security.QpidSecurityException;
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.security.access.Operation;
-import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
 import org.apache.qpid.server.stats.StatisticsGatherer;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.txn.LocalTransaction;
@@ -356,7 +353,7 @@
                     name,
                     type,
                     durable,
-                    lifetime == LifetimePolicy.AUTO_DELETE,
+                    lifetime != null && lifetime != LifetimePolicy.PERMANENT,
                     alternateExchange);
             synchronized (_exchangeAdapters)
             {
@@ -389,7 +386,7 @@
     public Queue createQueue(Map<String, Object> attributes)
             throws AccessControlException, IllegalArgumentException
     {
-        attributes = new HashMap<String, Object>(attributes);
+        checkVHostStateIsActive();
 
         if (attributes.containsKey(Queue.QUEUE_TYPE))
         {
@@ -405,7 +402,7 @@
             }
             if (queueType == QueueType.LVQ && attributes.get(Queue.LVQ_KEY) == null)
             {
-                attributes.put(Queue.LVQ_KEY, AMQQueueFactory.QPID_DEFAULT_LVQ_KEY);
+                attributes.put(Queue.LVQ_KEY, ConflationQueue.DEFAULT_LVQ_KEY);
             }
             else if (queueType == QueueType.PRIORITY && attributes.get(Queue.PRIORITIES) == null)
             {
@@ -417,51 +414,12 @@
             }
         }
 
-        String         name     = MapValueConverter.getStringAttribute(Queue.NAME, attributes, null);
-        State          state    = MapValueConverter.getEnumAttribute(State.class, Queue.STATE, attributes, State.ACTIVE);
-        boolean        durable  = MapValueConverter.getBooleanAttribute(Queue.DURABLE, attributes, false);
-        LifetimePolicy lifetime = MapValueConverter.getEnumAttribute(LifetimePolicy.class, Queue.LIFETIME_POLICY, attributes, LifetimePolicy.PERMANENT);
-        long           ttl      = MapValueConverter.getLongAttribute(Queue.TIME_TO_LIVE, attributes, 0l);
-        boolean        exclusive= MapValueConverter.getBooleanAttribute(Queue.EXCLUSIVE, attributes, false);
 
-        attributes.remove(Queue.NAME);
-        attributes.remove(Queue.STATE);
-        attributes.remove(Queue.DURABLE);
-        attributes.remove(Queue.LIFETIME_POLICY);
-        attributes.remove(Queue.TIME_TO_LIVE);
-
-        return createQueue(name, state, durable, exclusive, lifetime, ttl, attributes);
-    }
-
-    public Queue createQueue(final String name,
-                             final State initialState,
-                             final boolean durable,
-                             boolean exclusive,
-                             final LifetimePolicy lifetime,
-                             final long ttl,
-                             final Map<String, Object> attributes)
-            throws AccessControlException, IllegalArgumentException
-    {
-        checkVHostStateIsActive();
-
-        String owner = null;
-        if(exclusive)
-        {
-            Principal authenticatedPrincipal = AuthenticatedPrincipal.getOptionalAuthenticatedPrincipalFromSubject(SecurityManager.getThreadSubject());
-            if(authenticatedPrincipal != null)
-            {
-                owner = authenticatedPrincipal.getName();
-            }
-        }
-
-        final boolean autoDelete = lifetime == LifetimePolicy.AUTO_DELETE;
 
         try
         {
 
-            AMQQueue queue =
-                    _virtualHost.createQueue(UUIDGenerator.generateQueueUUID(name, _virtualHost.getName()), name,
-                            durable, owner, autoDelete, exclusive, autoDelete && exclusive, attributes);
+            AMQQueue queue = _virtualHost.createQueue(null, attributes);
 
             synchronized (_queueAdapters)
             {
@@ -471,15 +429,15 @@
         }
         catch(QueueExistsException qe)
         {
-            throw new IllegalArgumentException("Queue with name "+name+" already exists");
+            throw new IllegalArgumentException("Queue with name "+MapValueConverter.getStringAttribute(Queue.NAME,attributes)+" already exists");
         }
         catch (QpidSecurityException e)
         {
             throw new AccessControlException(e.toString());
         }
-
     }
 
+
     public String getName()
     {
         return (String)getAttribute(NAME);
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java b/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
index bc272f1..623cf62 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
@@ -25,11 +25,13 @@
 import org.apache.qpid.server.model.Port;
 import org.apache.qpid.server.model.Transport;
 import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.util.Deletable;
 
+import java.security.Principal;
 import java.util.List;
 import java.util.UUID;
 
-public interface AMQConnectionModel extends StatisticsGatherer
+public interface AMQConnectionModel<T extends AMQConnectionModel<T,S>, S extends AMQSessionModel<S,T>> extends StatisticsGatherer, Deletable<T>
 {
     /**
      * Close the underlying Connection
@@ -50,7 +52,7 @@
      * @param cause
      * @param message
      */
-    public void closeSession(AMQSessionModel session, AMQConstant cause, String message);
+    public void closeSession(S session, AMQConstant cause, String message);
 
     public long getConnectionId();
 
@@ -59,15 +61,13 @@
      *
      * @return a list of {@link AMQSessionModel}s
      */
-    public List<AMQSessionModel> getSessionModels();
+    public List<S> getSessionModels();
 
     /**
      * Return a {@link LogSubject} for the connection.
      */
     public LogSubject getLogSubject();
 
-    public String getUserName();
-
     public boolean isSessionNameUnique(byte[] name);
 
     String getRemoteAddressString();
@@ -78,7 +78,7 @@
 
     String getClientProduct();
 
-    String getPrincipalAsString();
+    Principal getAuthorizedPrincipal();
 
     long getSessionCountLimit();
 
@@ -93,4 +93,6 @@
     boolean isStopped();
 
     String getVirtualHostName();
+
+    String getRemoteContainerName();
 }
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
index 520489a..db92f69 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
@@ -26,17 +26,18 @@
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.util.Deletable;
 
 /**
  * Session model interface.
  * Extends {@link Comparable} to allow objects to be inserted into a {@link ConcurrentSkipListSet}
  * when monitoring the blocking and blocking of queues/sessions in {@link AMQQueue}.
  */
-public interface AMQSessionModel extends Comparable<AMQSessionModel>
+public interface AMQSessionModel<T extends AMQSessionModel<T,C>, C extends AMQConnectionModel<C,T>> extends Comparable<T>, Deletable<T>
 {
     public UUID getId();
 
-    public AMQConnectionModel getConnectionModel();
+    public C getConnectionModel();
 
     public String getClientID();
 
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 237f2cd..aceebc7 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -27,10 +27,13 @@
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.model.ExclusivityPolicy;
+import org.apache.qpid.server.model.LifetimePolicy;
 import org.apache.qpid.server.protocol.CapacityChecker;
 import org.apache.qpid.server.consumer.Consumer;
 import org.apache.qpid.server.security.QpidSecurityException;
 import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.Deletable;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.Collection;
@@ -38,9 +41,12 @@
 import java.util.Set;
 
 public interface AMQQueue<E extends QueueEntry<E,Q,C>, Q extends AMQQueue<E,Q,C>, C extends Consumer>
-        extends Comparable<Q>, ExchangeReferrer, BaseQueue<C>, MessageSource<C,Q>, CapacityChecker, MessageDestination
+        extends Comparable<Q>, ExchangeReferrer, BaseQueue<C>, MessageSource<C,Q>, CapacityChecker, MessageDestination,
+                Deletable<Q>
 {
 
+    void setExclusivityPolicy(ExclusivityPolicy desiredPolicy) throws ExistingConsumerPreventsExclusive;
+
     public interface NotificationListener
     {
         void notifyClients(NotificationCheck notification, AMQQueue queue, String notificationMsg);
@@ -66,9 +72,7 @@
 
     long getTotalEnqueueCount();
 
-    void setNoLocal(boolean b);
-
-    boolean isAutoDelete();
+    LifetimePolicy getLifetimePolicy();
 
     String getOwner();
 
@@ -104,11 +108,6 @@
 
     boolean resend(final E entry, final C consumer);
 
-    void addQueueDeleteTask(Action<AMQQueue> task);
-    void removeQueueDeleteTask(Action<AMQQueue> task);
-
-
-
     List<E> getMessagesOnTheQueue();
 
     List<Long> getMessagesOnTheQueue(int num);
@@ -189,10 +188,6 @@
     Collection<String> getAvailableAttributes();
     Object getAttribute(String attrName);
 
-    void configure(QueueConfiguration config);
-
-    void setExclusive(boolean exclusive);
-
     /**
      * Gets the maximum delivery count.   If a message on this queue
      * is delivered more than maximumDeliveryCount, the message will be
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
index 409c528..5003db1 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
@@ -20,12 +20,14 @@
  */
 package org.apache.qpid.server.queue;
 
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 
 import org.apache.qpid.server.exchange.AMQUnknownExchangeType;
+import org.apache.qpid.server.model.ExclusivityPolicy;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.security.QpidSecurityException;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.server.configuration.BrokerProperties;
@@ -36,6 +38,7 @@
 import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.server.util.MapValueConverter;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
 import org.apache.qpid.server.virtualhost.ExchangeExistsException;
 import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
@@ -45,7 +48,6 @@
 
 public class AMQQueueFactory implements QueueFactory
 {
-    public static final String QPID_DEFAULT_LVQ_KEY = "qpid.LVQ_key";
 
 
     public static final String DEFAULT_DLQ_NAME_SUFFIX = "_DLQ";
@@ -59,346 +61,101 @@
     {
         _virtualHost = virtualHost;
         _queueRegistry = queueRegistry;
-    }
-
-    private abstract static class QueueProperty
-    {
-
-        private final String _argumentName;
-
-
-        public QueueProperty(String argumentName)
-        {
-            _argumentName = argumentName;
-        }
-
-        public String getArgumentName()
-        {
-            return _argumentName;
-        }
-
-
-        public abstract void setPropertyValue(AMQQueue queue, Object value);
-
-    }
-
-    private abstract static class QueueLongProperty extends QueueProperty
-    {
-
-        public QueueLongProperty(String argumentName)
-        {
-            super(argumentName);
-        }
-
-        public void setPropertyValue(AMQQueue queue, Object value)
-        {
-            if(value instanceof Number)
-            {
-                setPropertyValue(queue, ((Number)value).longValue());
-            }
-
-        }
-
-        abstract void setPropertyValue(AMQQueue queue, long value);
-
-
-    }
-
-    private abstract static class QueueIntegerProperty extends QueueProperty
-    {
-        public QueueIntegerProperty(String argumentName)
-        {
-            super(argumentName);
-        }
-
-        public void setPropertyValue(AMQQueue queue, Object value)
-        {
-            if(value instanceof Number)
-            {
-                setPropertyValue(queue, ((Number)value).intValue());
-            }
-
-        }
-        abstract void setPropertyValue(AMQQueue queue, int value);
-    }
-
-    private static final QueueProperty[] DECLARABLE_PROPERTIES = {
-            new QueueLongProperty(Queue.ALERT_THRESHOLD_MESSAGE_AGE)
-            {
-                public void setPropertyValue(AMQQueue queue, long value)
-                {
-                    queue.setMaximumMessageAge(value);
-                }
-            },
-            new QueueLongProperty(Queue.ALERT_THRESHOLD_MESSAGE_SIZE)
-            {
-                public void setPropertyValue(AMQQueue queue, long value)
-                {
-                    queue.setMaximumMessageSize(value);
-                }
-            },
-            new QueueLongProperty(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES)
-            {
-                public void setPropertyValue(AMQQueue queue, long value)
-                {
-                    queue.setMaximumMessageCount(value);
-                }
-            },
-            new QueueLongProperty(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES)
-            {
-                public void setPropertyValue(AMQQueue queue, long value)
-                {
-                    queue.setMaximumQueueDepth(value);
-                }
-            },
-            new QueueLongProperty(Queue.ALERT_REPEAT_GAP)
-            {
-                public void setPropertyValue(AMQQueue queue, long value)
-                {
-                    queue.setMinimumAlertRepeatGap(value);
-                }
-            },
-            new QueueLongProperty(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES)
-            {
-                public void setPropertyValue(AMQQueue queue, long value)
-                {
-                    queue.setCapacity(value);
-                }
-            },
-            new QueueLongProperty(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES)
-            {
-                public void setPropertyValue(AMQQueue queue, long value)
-                {
-                    queue.setFlowResumeCapacity(value);
-                }
-            },
-            new QueueIntegerProperty(Queue.MAXIMUM_DELIVERY_ATTEMPTS)
-            {
-                public void setPropertyValue(AMQQueue queue, int value)
-                {
-                    queue.setMaximumDeliveryCount(value);
-                }
-            }
-    };
+    }   
 
     @Override
-    public AMQQueue restoreQueue(UUID id,
-                                 String queueName,
-                                 String owner,
-                                 boolean autoDelete,
-                                 boolean exclusive,
-                                 boolean deleteOnNoConsumer,
-                                 Map<String, Object> arguments) throws QpidSecurityException
+    public AMQQueue restoreQueue(Map<String, Object> attributes) throws QpidSecurityException
     {
-        return createOrRestoreQueue(id, queueName, true, owner, autoDelete, exclusive, deleteOnNoConsumer, arguments, false);
+        return createOrRestoreQueue(null, attributes, false);
 
     }
 
-    /**
-     * @param id the id to use.
-     * @param deleteOnNoConsumer
-     */
     @Override
-    public AMQQueue createQueue(UUID id,
-                                String queueName,
-                                boolean durable,
-                                String owner,
-                                boolean autoDelete,
-                                boolean exclusive,
-                                boolean deleteOnNoConsumer,
-                                Map<String, Object> arguments) throws QpidSecurityException
+    public AMQQueue createQueue(final AMQSessionModel creatingSession,
+                                Map<String, Object> attributes) throws QpidSecurityException
     {
-        return createOrRestoreQueue(id, queueName, durable, owner, autoDelete, exclusive, deleteOnNoConsumer, arguments, true);
+        return createOrRestoreQueue(creatingSession, attributes, true);
     }
 
-    private AMQQueue createOrRestoreQueue(UUID id,
-                                          String queueName,
-                                          boolean durable,
-                                          String owner,
-                                          boolean autoDelete,
-                                          boolean exclusive,
-                                          boolean deleteOnNoConsumer,
-                                          Map<String, Object> arguments,
+    private AMQQueue createOrRestoreQueue(final AMQSessionModel creatingSession, Map<String, Object> attributes,
                                           boolean createInStore) throws QpidSecurityException
     {
-        if (id == null)
+
+
+        String queueName = MapValueConverter.getStringAttribute(Queue.NAME,attributes);
+
+        QueueConfiguration config = _virtualHost.getConfiguration().getQueueConfiguration(queueName);
+
+        if (!attributes.containsKey(Queue.ALERT_THRESHOLD_MESSAGE_AGE) && config.getMaximumMessageAge() != 0)
         {
-            throw new IllegalArgumentException("Queue id must not be null");
+            attributes.put(Queue.ALERT_THRESHOLD_MESSAGE_AGE, config.getMaximumMessageAge());
         }
-        if (queueName == null)
+        if (!attributes.containsKey(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES) && config.getMaximumQueueDepth() != 0)
         {
-            throw new IllegalArgumentException("Queue name must not be null");
+            attributes.put(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, config.getMaximumQueueDepth());
+        }
+        if (!attributes.containsKey(Queue.ALERT_THRESHOLD_MESSAGE_SIZE) && config.getMaximumMessageSize() != 0)
+        {
+            attributes.put(Queue.ALERT_THRESHOLD_MESSAGE_SIZE, config.getMaximumMessageSize());
+        }
+        if (!attributes.containsKey(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES) && config.getMaximumMessageCount() != 0)
+        {
+            attributes.put(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, config.getMaximumMessageCount());
+        }
+        if (!attributes.containsKey(Queue.ALERT_REPEAT_GAP) && config.getMinimumAlertRepeatGap() != 0)
+        {
+            attributes.put(Queue.ALERT_REPEAT_GAP, config.getMinimumAlertRepeatGap());
+        }
+        if (config.getMaxDeliveryCount() != 0 && !attributes.containsKey(Queue.MAXIMUM_DELIVERY_ATTEMPTS))
+        {
+            attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, config.getMaxDeliveryCount());
+        }
+        if (!attributes.containsKey(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES) && config.getCapacity() != 0)
+        {
+            attributes.put(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES, config.getCapacity());
+        }
+        if (!attributes.containsKey(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES) && config.getFlowResumeCapacity() != 0)
+        {
+            attributes.put(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES, config.getFlowResumeCapacity());
         }
 
 
-        QueueConfiguration queueConfiguration = _virtualHost.getConfiguration().getQueueConfiguration(queueName);
-
-        boolean createDLQ = createDLQ(autoDelete, arguments, queueConfiguration);
+        boolean createDLQ = createDLQ(attributes, config);
         if (createDLQ)
         {
             validateDLNames(queueName);
         }
 
-        int priorities = 1;
-        String conflationKey = null;
-        String sortingKey = null;
+        AMQQueue queue;
 
-        if(arguments != null)
+        if(attributes.containsKey(Queue.SORT_KEY))
         {
-            if(arguments.containsKey(Queue.LVQ_KEY))
-            {
-                conflationKey = (String) arguments.get(Queue.LVQ_KEY);
-                if(conflationKey == null)
-                {
-                    conflationKey = QPID_DEFAULT_LVQ_KEY;
-                }
-            }
-            else if(arguments.containsKey(Queue.PRIORITIES))
-            {
-                Object prioritiesObj = arguments.get(Queue.PRIORITIES);
-                if(prioritiesObj instanceof Number)
-                {
-                    priorities = ((Number)prioritiesObj).intValue();
-                }
-                else if(prioritiesObj instanceof String)
-                {
-                    try
-                    {
-                        priorities = Integer.parseInt(prioritiesObj.toString());
-                    }
-                    catch (NumberFormatException e)
-                    {
-                        // TODO - should warn here of invalid format
-                    }
-                }
-                else
-                {
-                    // TODO - should warn here of invalid format
-                }
-            }
-            else if(arguments.containsKey(Queue.SORT_KEY))
-            {
-                sortingKey = (String)arguments.get(Queue.SORT_KEY);
-            }
+            queue = new SortedQueue(_virtualHost, creatingSession, attributes);
         }
-
-        AMQQueue q;
-        if(sortingKey != null)
+        else if(attributes.containsKey(Queue.LVQ_KEY))
         {
-            q = new SortedQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments, sortingKey);
+            queue = new ConflationQueue(_virtualHost, creatingSession, attributes);
         }
-        else if(conflationKey != null)
+        else if(attributes.containsKey(Queue.PRIORITIES))
         {
-            q = new ConflationQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments, conflationKey);
-        }
-        else if(priorities > 1)
-        {
-            q = new PriorityQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments, priorities);
+            queue = new PriorityQueue(_virtualHost, creatingSession, attributes);
         }
         else
         {
-            q = new StandardQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments);
+            queue = new StandardQueue(_virtualHost, creatingSession, attributes);
         }
 
-        q.setDeleteOnNoConsumers(deleteOnNoConsumer);
-
         //Register the new queue
-        _queueRegistry.registerQueue(q);
-
-        q.configure(_virtualHost.getConfiguration().getQueueConfiguration(queueName));
-
-        if(arguments != null)
-        {
-            for(QueueProperty p : DECLARABLE_PROPERTIES)
-            {
-                if(arguments.containsKey(p.getArgumentName()))
-                {
-                    p.setPropertyValue(q, arguments.get(p.getArgumentName()));
-                }
-            }
-
-            if(arguments.get(Queue.NO_LOCAL) instanceof Boolean)
-            {
-                q.setNoLocal((Boolean)arguments.get(Queue.NO_LOCAL));
-            }
-
-        }
+        _queueRegistry.registerQueue(queue);
 
         if(createDLQ)
         {
-            final String dlExchangeName = getDeadLetterExchangeName(queueName);
-            final String dlQueueName = getDeadLetterQueueName(queueName);
-
-            Exchange dlExchange = null;
-            final UUID dlExchangeId = UUIDGenerator.generateExchangeUUID(dlExchangeName, _virtualHost.getName());
-
-            try
-            {
-                dlExchange = _virtualHost.createExchange(dlExchangeId,
-                                                                dlExchangeName,
-                                                                ExchangeDefaults.FANOUT_EXCHANGE_CLASS,
-                                                                true, false, null);
-            }
-            catch(ExchangeExistsException e)
-            {
-                // We're ok if the exchange already exists
-                dlExchange = e.getExistingExchange();
-            }
-            catch (ReservedExchangeNameException e)
-            {
-                throw new ConnectionScopedRuntimeException("Attempt to create an alternate exchange for a queue failed",e);
-            }
-            catch (AMQUnknownExchangeType e)
-            {
-                throw new ConnectionScopedRuntimeException("Attempt to create an alternate exchange for a queue failed",e);
-            }
-            catch (UnknownExchangeException e)
-            {
-                throw new ConnectionScopedRuntimeException("Attempt to create an alternate exchange for a queue failed",e);
-            }
-
-            AMQQueue dlQueue = null;
-
-            synchronized(_queueRegistry)
-            {
-                dlQueue = _queueRegistry.getQueue(dlQueueName);
-
-                if(dlQueue == null)
-                {
-                    //set args to disable DLQ-ing/MDC from the DLQ itself, preventing loops etc
-                    final Map<String, Object> args = new HashMap<String, Object>();
-                    args.put(Queue.CREATE_DLQ_ON_CREATION, false);
-                    args.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 0);
-
-                    try
-                    {
-                        dlQueue = _virtualHost.createQueue(UUIDGenerator.generateQueueUUID(dlQueueName, _virtualHost.getName()), dlQueueName, true, owner, false, exclusive,
-                                false, args);
-                    }
-                    catch (QueueExistsException e)
-                    {
-                        throw new ServerScopedRuntimeException("Attempt to create a queue failed because the " +
-                                                               "queue already exists, however this occurred within " +
-                                                               "a block where the queue existence had previously been " +
-                                                               "checked, and no queue creation should have been " +
-                                                               "possible from another thread", e);
-                    }
-                }
-            }
-
-            //ensure the queue is bound to the exchange
-            if(!dlExchange.isBound(DLQ_ROUTING_KEY, dlQueue))
-            {
-                //actual routing key used does not matter due to use of fanout exchange,
-                //but we will make the key 'dlq' as it can be logged at creation.
-                dlExchange.addBinding(DLQ_ROUTING_KEY, dlQueue, null);
-            }
-            q.setAlternateExchange(dlExchange);
+            createDLQ(queue);
         }
-        else if(arguments != null && arguments.get(Queue.ALTERNATE_EXCHANGE) instanceof String)
+        else if(attributes != null && attributes.get(Queue.ALTERNATE_EXCHANGE) instanceof String)
         {
 
-            final String altExchangeAttr = (String) arguments.get(Queue.ALTERNATE_EXCHANGE);
+            final String altExchangeAttr = (String) attributes.get(Queue.ALTERNATE_EXCHANGE);
             Exchange altExchange;
             try
             {
@@ -408,32 +165,103 @@
             {
                 altExchange = _virtualHost.getExchange(altExchangeAttr);
             }
-            q.setAlternateExchange(altExchange);
+            queue.setAlternateExchange(altExchange);
         }
 
-        if (createInStore && q.isDurable() && !q.isAutoDelete())
+        if (createInStore && queue.isDurable() && !(queue.getLifetimePolicy()
+                                                    == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE
+                                                    || queue.getLifetimePolicy()
+                                                       == LifetimePolicy.DELETE_ON_SESSION_END))
         {
-            DurableConfigurationStoreHelper.createQueue(_virtualHost.getDurableConfigurationStore(), q);
+            DurableConfigurationStoreHelper.createQueue(_virtualHost.getDurableConfigurationStore(), queue);
         }
 
-        return q;
+        return queue;
+    }
+
+    private void createDLQ(final AMQQueue queue) throws QpidSecurityException
+    {
+        final String queueName = queue.getName();
+        final String dlExchangeName = getDeadLetterExchangeName(queueName);
+        final String dlQueueName = getDeadLetterQueueName(queueName);
+
+        Exchange dlExchange = null;
+        final UUID dlExchangeId = UUIDGenerator.generateExchangeUUID(dlExchangeName, _virtualHost.getName());
+
+        try
+        {
+            dlExchange = _virtualHost.createExchange(dlExchangeId,
+                                                            dlExchangeName,
+                                                            ExchangeDefaults.FANOUT_EXCHANGE_CLASS,
+                                                            true, false, null);
+        }
+        catch(ExchangeExistsException e)
+        {
+            // We're ok if the exchange already exists
+            dlExchange = e.getExistingExchange();
+        }
+        catch (ReservedExchangeNameException e)
+        {
+            throw new ConnectionScopedRuntimeException("Attempt to create an alternate exchange for a queue failed",e);
+        }
+        catch (AMQUnknownExchangeType e)
+        {
+            throw new ConnectionScopedRuntimeException("Attempt to create an alternate exchange for a queue failed",e);
+        }
+        catch (UnknownExchangeException e)
+        {
+            throw new ConnectionScopedRuntimeException("Attempt to create an alternate exchange for a queue failed",e);
+        }
+
+        AMQQueue dlQueue = null;
+
+        synchronized(_queueRegistry)
+        {
+            dlQueue = _queueRegistry.getQueue(dlQueueName);
+
+            if(dlQueue == null)
+            {
+                //set args to disable DLQ-ing/MDC from the DLQ itself, preventing loops etc
+                final Map<String, Object> args = new HashMap<String, Object>();
+                args.put(Queue.CREATE_DLQ_ON_CREATION, false);
+                args.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 0);
+
+                try
+                {
+
+
+                    args.put(Queue.ID, UUIDGenerator.generateQueueUUID(dlQueueName, _virtualHost.getName()));
+                    args.put(Queue.NAME, dlQueueName);
+                    args.put(Queue.DURABLE, true);
+                    dlQueue = _virtualHost.createQueue(null, args);
+                }
+                catch (QueueExistsException e)
+                {
+                    throw new ServerScopedRuntimeException("Attempt to create a queue failed because the " +
+                                                           "queue already exists, however this occurred within " +
+                                                           "a block where the queue existence had previously been " +
+                                                           "checked, and no queue creation should have been " +
+                                                           "possible from another thread", e);
+                }
+            }
+        }
+
+        //ensure the queue is bound to the exchange
+        if(!dlExchange.isBound(DLQ_ROUTING_KEY, dlQueue))
+        {
+            //actual routing key used does not matter due to use of fanout exchange,
+            //but we will make the key 'dlq' as it can be logged at creation.
+            dlExchange.addBinding(DLQ_ROUTING_KEY, dlQueue, null);
+        }
+        queue.setAlternateExchange(dlExchange);
     }
 
     public AMQQueue createAMQQueueImpl(QueueConfiguration config) throws QpidSecurityException
     {
-        String queueName = config.getName();
 
-        boolean durable = config.getDurable();
-        boolean autodelete = config.getAutoDelete();
-        boolean exclusive = config.getExclusive();
-        String owner = config.getOwner();
-        Map<String, Object> arguments = createQueueArgumentsFromConfig(config);
-
-        // we need queues that are defined in config to have deterministic ids.
-        UUID id = UUIDGenerator.generateQueueUUID(queueName, _virtualHost.getName());
-
-        AMQQueue q = createQueue(id, queueName, durable, owner, autodelete, exclusive, false, arguments);
-        q.configure(config);
+        Map<String, Object> arguments = createQueueAttributesFromConfig(_virtualHost, config);
+        
+        AMQQueue q = createOrRestoreQueue(null, arguments, false);
         return q;
     }
 
@@ -471,16 +299,19 @@
     /**
      * Checks if DLQ is enabled for the queue.
      *
-     * @param autoDelete
-     *            queue auto-delete flag
      * @param arguments
      *            queue arguments
      * @param qConfig
      *            queue configuration
      * @return true if DLQ enabled
      */
-    protected static boolean createDLQ(boolean autoDelete, Map<String, Object> arguments, QueueConfiguration qConfig)
+    protected static boolean createDLQ(Map<String, Object> arguments, QueueConfiguration qConfig)
     {
+        boolean autoDelete = MapValueConverter.getEnumAttribute(LifetimePolicy.class,
+                                                                Queue.LIFETIME_POLICY,
+                                                                arguments,
+                                                                LifetimePolicy.PERMANENT) != LifetimePolicy.PERMANENT;
+
         //feature is not to be enabled for temporary queues or when explicitly disabled by argument
         if (!(autoDelete || (arguments != null && arguments.containsKey(Queue.ALTERNATE_EXCHANGE))))
         {
@@ -525,46 +356,59 @@
         return name + System.getProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX);
     }
 
-    private static Map<String, Object> createQueueArgumentsFromConfig(QueueConfiguration config)
+    private static Map<String, Object> createQueueAttributesFromConfig(final VirtualHost virtualHost,
+                                                                       QueueConfiguration config)
     {
-        Map<String,Object> arguments = new HashMap<String,Object>();
+        Map<String,Object> attributes = new HashMap<String,Object>();
 
         if(config.getArguments() != null && !config.getArguments().isEmpty())
         {
-            arguments.putAll(QueueArgumentsConverter.convertWireArgsToModel(new HashMap<String, Object>(config.getArguments())));
+            attributes.putAll(QueueArgumentsConverter.convertWireArgsToModel(new HashMap<String, Object>(config.getArguments())));
         }
 
         if(config.isLVQ() || config.getLVQKey() != null)
         {
-            arguments.put(Queue.LVQ_KEY, config.getLVQKey() == null ? QPID_DEFAULT_LVQ_KEY : config.getLVQKey());
+            attributes.put(Queue.LVQ_KEY,
+                          config.getLVQKey() == null ? ConflationQueue.DEFAULT_LVQ_KEY : config.getLVQKey());
         }
         else if (config.getPriority() || config.getPriorities() > 0)
         {
-            arguments.put(Queue.PRIORITIES, config.getPriorities() < 0 ? 10 : config.getPriorities());
+            attributes.put(Queue.PRIORITIES, config.getPriorities() < 0 ? 10 : config.getPriorities());
         }
         else if (config.getQueueSortKey() != null && !"".equals(config.getQueueSortKey()))
         {
-            arguments.put(Queue.SORT_KEY, config.getQueueSortKey());
+            attributes.put(Queue.SORT_KEY, config.getQueueSortKey());
         }
 
         if (!config.getAutoDelete() && config.isDeadLetterQueueEnabled())
         {
-            arguments.put(Queue.CREATE_DLQ_ON_CREATION, true);
+            attributes.put(Queue.CREATE_DLQ_ON_CREATION, true);
         }
 
         if (config.getDescription() != null && !"".equals(config.getDescription()))
         {
-            arguments.put(Queue.DESCRIPTION, config.getDescription());
+            attributes.put(Queue.DESCRIPTION, config.getDescription());
         }
 
-        if (arguments.isEmpty())
+        attributes.put(Queue.DURABLE, config.getDurable());
+        attributes.put(Queue.LIFETIME_POLICY,
+                      config.getAutoDelete() ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS : LifetimePolicy.PERMANENT);
+        if(config.getExclusive())
         {
-            return Collections.emptyMap();
+            attributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER);
         }
-        else
+        if(config.getOwner() != null)
         {
-            return arguments;
+            attributes.put(Queue.OWNER, config.getOwner());
         }
+        
+        attributes.put(Queue.NAME, config.getName());
+        
+        // we need queues that are defined in config to have deterministic ids.
+        attributes.put(Queue.ID, UUIDGenerator.generateQueueUUID(config.getName(), virtualHost.getName()));
+
+
+        return attributes;
     }
 
 }
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java
index a1ff519..f350368 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java
@@ -22,22 +22,32 @@
 package org.apache.qpid.server.queue;
 
 import java.util.Map;
-import java.util.UUID;
 
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.util.MapValueConverter;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 public class ConflationQueue extends SimpleAMQQueue<ConflationQueueList.ConflationQueueEntry, ConflationQueue, ConflationQueueList>
 {
-    protected ConflationQueue(UUID id,
-                              String name,
-                              boolean durable,
-                              String owner,
-                              boolean autoDelete,
-                              boolean exclusive,
-                              VirtualHost virtualHost,
-                              Map<String, Object> args, String conflationKey)
+    public static final String DEFAULT_LVQ_KEY = "qpid.LVQ_key";
+
+
+    protected ConflationQueue(VirtualHost virtualHost,
+                              final AMQSessionModel creatingSession, Map<String, Object> attributes)
     {
-        super(id, name, durable, owner, autoDelete, exclusive, virtualHost, new ConflationQueueList.Factory(conflationKey), args);
+        super(virtualHost, creatingSession, attributes, entryList(attributes));
+    }
+
+    private static ConflationQueueList.Factory entryList(final Map<String, Object> attributes)
+    {
+
+        String conflationKey = MapValueConverter.getStringAttribute(Queue.LVQ_KEY,
+                                                                    attributes,
+                                                                    DEFAULT_LVQ_KEY);
+
+        // conflation key can still be null if it was present in the map with a null value
+        return new ConflationQueueList.Factory(conflationKey == null ? DEFAULT_LVQ_KEY : conflationKey);
     }
 
     public String getConflationKey()
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
index 7cf245c..c9ac3f3 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
@@ -20,19 +20,20 @@
  */
 package org.apache.qpid.server.queue;
 
+import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.Map;
-import java.util.UUID;
 
 public abstract class OutOfOrderQueue<E extends QueueEntryImpl<E,Q,L>, Q extends OutOfOrderQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>> extends SimpleAMQQueue<E,Q,L>
 {
 
-    protected OutOfOrderQueue(UUID id, String name, boolean durable,
-                              String owner, boolean autoDelete, boolean exclusive,
-                              VirtualHost virtualHost, QueueEntryListFactory<E,Q,L> entryListFactory, Map<String, Object> arguments)
+    protected OutOfOrderQueue(VirtualHost virtualHost,
+                              final AMQSessionModel creatingSession,
+                              Map<String, Object> attributes,
+                              QueueEntryListFactory<E, Q, L> entryListFactory)
     {
-        super(id, name, durable, owner, autoDelete, exclusive, virtualHost, entryListFactory, arguments);
+        super(virtualHost, creatingSession, attributes, entryListFactory);
     }
 
     @Override
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java
index 4440d04..1e41a0f 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java
@@ -20,23 +20,31 @@
 */
 package org.apache.qpid.server.queue;
 
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.util.MapValueConverter;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.Map;
-import java.util.UUID;
 
 public class PriorityQueue extends OutOfOrderQueue<PriorityQueueList.PriorityQueueEntry, PriorityQueue, PriorityQueueList>
 {
-    protected PriorityQueue(UUID id,
-                            final String name,
-                            final boolean durable,
-                            final String owner,
-                            final boolean autoDelete,
-                            boolean exclusive,
-                            final VirtualHost virtualHost,
-                            Map<String, Object> arguments, int priorities)
+
+    public static final int DEFAULT_PRIORITY_LEVELS = 10;
+
+    protected PriorityQueue(VirtualHost virtualHost,
+                            final AMQSessionModel creatingSession,
+                            Map<String, Object> attributes)
     {
-        super(id, name, durable, owner, autoDelete, exclusive, virtualHost, new PriorityQueueList.Factory(priorities), arguments);
+        super(virtualHost, creatingSession, attributes, entryList(attributes));
+    }
+
+    private static PriorityQueueList.Factory entryList(final Map<String, Object> attributes)
+    {
+        final Integer priorities = MapValueConverter.getIntegerAttribute(Queue.PRIORITIES, attributes,
+                                                                         DEFAULT_PRIORITY_LEVELS);
+
+        return new PriorityQueueList.Factory(priorities);
     }
 
     public int getPriorities()
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
index 589f385..49123f8 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
@@ -109,7 +109,7 @@
             }
             if(wireArguments.containsKey(QPID_LAST_VALUE_QUEUE) && !wireArguments.containsKey(QPID_LAST_VALUE_QUEUE_KEY))
             {
-                modelArguments.put(Queue.LVQ_KEY, AMQQueueFactory.QPID_DEFAULT_LVQ_KEY);
+                modelArguments.put(Queue.LVQ_KEY, ConflationQueue.DEFAULT_LVQ_KEY);
             }
             if(wireArguments.containsKey(QPID_SHARED_MSG_GROUP))
             {
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java
index 8c0386c..62a2d93 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java
@@ -22,25 +22,15 @@
 
 import java.util.Map;
 import java.util.UUID;
+
+import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.security.QpidSecurityException;
 
 public interface QueueFactory
 {
-    AMQQueue createQueue(UUID id,
-                         String queueName,
-                         boolean durable,
-                         String owner,
-                         boolean autoDelete,
-                         boolean exclusive,
-                         boolean deleteOnNoConsumer,
+    AMQQueue createQueue(final AMQSessionModel creatingSession,
                          Map<String, Object> arguments) throws QpidSecurityException;
 
-    AMQQueue restoreQueue(UUID id,
-                          String queueName,
-                          String owner,
-                          boolean autoDelete,
-                          boolean exclusive,
-                          boolean deleteOnNoConsumer,
-                          Map<String, Object> arguments) throws QpidSecurityException;
+    AMQQueue restoreQueue(Map<String, Object> arguments) throws QpidSecurityException;
 
 }
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index b2408f6..aa7025e 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -18,6 +18,7 @@
  */
 package org.apache.qpid.server.queue;
 
+import java.security.Principal;
 import java.util.*;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -28,11 +29,14 @@
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.log4j.Logger;
+import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.model.ExclusivityPolicy;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.security.QpidSecurityException;
 import org.apache.qpid.pool.ReferenceCountingExecutorService;
 import org.apache.qpid.server.binding.Binding;
 import org.apache.qpid.server.configuration.BrokerProperties;
-import org.apache.qpid.server.configuration.QueueConfiguration;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.logging.LogActor;
@@ -50,12 +54,16 @@
 import org.apache.qpid.server.security.AuthorizationHolder;
 import org.apache.qpid.server.consumer.Consumer;
 import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
 import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.server.util.Deletable;
+import org.apache.qpid.server.util.MapValueConverter;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
 import org.apache.qpid.server.util.StateChangeListener;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
@@ -78,19 +86,10 @@
     private final String _name;
 
     /** null means shared */
-    private final String _owner;
-
-    private AuthorizationHolder _authorizationHolder;
-
-    private boolean _exclusive = false;
-    private AMQSessionModel _exclusiveOwner;
-
+    private String _description;
 
     private final boolean _durable;
 
-    /** If true, this queue is deleted when the last subscriber is removed */
-    private final boolean _autoDelete;
-
     private Exchange _alternateExchange;
 
 
@@ -142,6 +141,10 @@
 
     private long _flowResumeCapacity;
 
+    private ExclusivityPolicy _exclusivityPolicy;
+    private LifetimePolicy _lifetimePolicy;
+    private Object _exclusiveOwner; // could be connection, session or Principal
+
     private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class);
 
 
@@ -157,7 +160,8 @@
     private final Set<AMQSessionModel> _blockedChannels = new ConcurrentSkipListSet<AMQSessionModel>();
 
     private final AtomicBoolean _deleted = new AtomicBoolean(false);
-    private final List<Action<AMQQueue>> _deleteTaskList = new CopyOnWriteArrayList<Action<AMQQueue>>();
+    private final List<Action<? super Q>> _deleteTaskList =
+            new CopyOnWriteArrayList<Action<? super Q>>();
 
 
     private LogSubject _logSubject;
@@ -184,16 +188,98 @@
     private AMQQueue.NotificationListener _notificationListener;
     private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length];
 
-
-    protected SimpleAMQQueue(UUID id,
-                             String name,
-                             boolean durable,
-                             String owner,
-                             boolean autoDelete,
-                             boolean exclusive,
-                             VirtualHost virtualHost,
-                             QueueEntryListFactory<E,Q,L> entryListFactory, Map<String,Object> arguments)
+    protected SimpleAMQQueue(VirtualHost virtualHost,
+                             final AMQSessionModel<?,?> creatingSession,
+                             Map<String, Object> attributes,
+                             QueueEntryListFactory<E, Q, L> entryListFactory)
     {
+        UUID id = MapValueConverter.getUUIDAttribute(Queue.ID, attributes);
+        String name = MapValueConverter.getStringAttribute(Queue.NAME, attributes);
+        boolean durable = MapValueConverter.getBooleanAttribute(Queue.DURABLE,attributes,false);
+
+
+        _exclusivityPolicy = MapValueConverter.getEnumAttribute(ExclusivityPolicy.class,
+                                                                Queue.EXCLUSIVE,
+                                                                attributes,
+                                                                ExclusivityPolicy.NONE);
+        _lifetimePolicy = MapValueConverter.getEnumAttribute(LifetimePolicy.class,
+                                                             Queue.LIFETIME_POLICY,
+                                                             attributes,
+                                                             LifetimePolicy.PERMANENT);
+        if(creatingSession != null)
+        {
+
+            switch(_exclusivityPolicy)
+            {
+
+                case PRINCIPAL:
+                    _exclusiveOwner = creatingSession.getConnectionModel().getAuthorizedPrincipal();
+                    break;
+                case CONTAINER:
+                    _exclusiveOwner = creatingSession.getConnectionModel().getRemoteContainerName();
+                    break;
+                case CONNECTION:
+                    _exclusiveOwner = creatingSession.getConnectionModel();
+                    addExclusivityConstraint(creatingSession.getConnectionModel());
+                    break;
+                case SESSION:
+                    _exclusiveOwner = creatingSession;
+                    addExclusivityConstraint(creatingSession);
+                    break;
+                case NONE:
+                case LINK:
+                    // nothing to do as if link no link associated until there is a consumer associated
+                    break;
+                default:
+                    throw new ServerScopedRuntimeException("Unknown exclusivity policy: "
+                                                           + _exclusivityPolicy
+                                                           + " this is a coding error inside Qpid");
+            }
+        }
+        else if(_exclusivityPolicy == ExclusivityPolicy.PRINCIPAL)
+        {
+            String owner = MapValueConverter.getStringAttribute(Queue.OWNER, attributes, null);
+            if(owner != null)
+            {
+                _exclusiveOwner = new AuthenticatedPrincipal(owner);
+            }
+        }
+        else if(_exclusivityPolicy == ExclusivityPolicy.CONTAINER)
+        {
+            String owner = MapValueConverter.getStringAttribute(Queue.OWNER, attributes, null);
+            if(owner != null)
+            {
+                _exclusiveOwner = owner;
+            }
+        }
+
+
+        if(_lifetimePolicy == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE)
+        {
+            if(creatingSession != null)
+            {
+                addLifetimeConstraint(creatingSession.getConnectionModel());
+            }
+            else
+            {
+                throw new IllegalArgumentException("Queues created with a lifetime policy of "
+                                                   + _lifetimePolicy
+                                                   + " must be created from a connection.");
+            }
+        }
+        else if(_lifetimePolicy == LifetimePolicy.DELETE_ON_SESSION_END)
+        {
+            if(creatingSession != null)
+            {
+                addLifetimeConstraint(creatingSession);
+            }
+            else
+            {
+                throw new IllegalArgumentException("Queues created with a lifetime policy of "
+                                                   + _lifetimePolicy
+                                                   + " must be created from a connection.");
+            }
+        }
 
         if (name == null)
         {
@@ -207,12 +293,18 @@
 
         _name = name;
         _durable = durable;
-        _owner = owner;
-        _autoDelete = autoDelete;
-        _exclusive = exclusive;
         _virtualHost = virtualHost;
-        _entries = entryListFactory.createQueueEntryList((Q)this);
-        _arguments = Collections.synchronizedMap(arguments == null ? new LinkedHashMap<String, Object>() : new LinkedHashMap<String, Object>(arguments));
+        _entries = entryListFactory.createQueueEntryList((Q) this);
+        final LinkedHashMap<String, Object> arguments = new LinkedHashMap<String, Object>(attributes);
+
+        arguments.put(Queue.EXCLUSIVE, _exclusivityPolicy);
+        arguments.put(Queue.LIFETIME_POLICY, _lifetimePolicy);
+
+        _arguments = Collections.synchronizedMap(arguments);
+        _description = MapValueConverter.getStringAttribute(Queue.DESCRIPTION, attributes, null);
+
+        _noLocal = MapValueConverter.getBooleanAttribute(Queue.NO_LOCAL, attributes, false);
+
 
         _id = id;
         _asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService();
@@ -220,30 +312,113 @@
         _logSubject = new QueueLogSubject(this);
         _logActor = new QueueActor(this, CurrentActor.get().getRootMessageLogger());
 
+
+        if (attributes.containsKey(Queue.ALERT_THRESHOLD_MESSAGE_AGE))
+        {
+            setMaximumMessageAge(MapValueConverter.getLongAttribute(Queue.ALERT_THRESHOLD_MESSAGE_AGE, attributes));
+        }
+        else
+        {
+            setMaximumMessageAge(virtualHost.getDefaultAlertThresholdMessageAge());
+        }
+        if (attributes.containsKey(Queue.ALERT_THRESHOLD_MESSAGE_SIZE))
+        {
+            setMaximumMessageSize(MapValueConverter.getLongAttribute(Queue.ALERT_THRESHOLD_MESSAGE_SIZE, attributes));
+        }
+        else
+        {
+            setMaximumMessageSize(virtualHost.getDefaultAlertThresholdMessageSize());
+        }
+        if (attributes.containsKey(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES))
+        {
+            setMaximumMessageCount(MapValueConverter.getLongAttribute(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES,
+                                                                      attributes));
+        }
+        else
+        {
+            setMaximumMessageCount(virtualHost.getDefaultAlertThresholdQueueDepthMessages());
+        }
+        if (attributes.containsKey(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES))
+        {
+            setMaximumQueueDepth(MapValueConverter.getLongAttribute(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES,
+                                                                    attributes));
+        }
+        else
+        {
+            setMaximumQueueDepth(virtualHost.getDefaultAlertThresholdQueueDepthBytes());
+        }
+        if (attributes.containsKey(Queue.ALERT_REPEAT_GAP))
+        {
+            setMinimumAlertRepeatGap(MapValueConverter.getLongAttribute(Queue.ALERT_REPEAT_GAP, attributes));
+        }
+        else
+        {
+            setMinimumAlertRepeatGap(virtualHost.getDefaultAlertRepeatGap());
+        }
+        if (attributes.containsKey(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES))
+        {
+            setCapacity(MapValueConverter.getLongAttribute(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES, attributes));
+        }
+        else
+        {
+            setCapacity(virtualHost.getDefaultQueueFlowControlSizeBytes());
+        }
+        if (attributes.containsKey(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES))
+        {
+            setFlowResumeCapacity(MapValueConverter.getLongAttribute(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES, attributes));
+        }
+        else
+        {
+            setFlowResumeCapacity(virtualHost.getDefaultQueueFlowResumeSizeBytes());
+        }
+        if (attributes.containsKey(Queue.MAXIMUM_DELIVERY_ATTEMPTS))
+        {
+            setMaximumDeliveryCount(MapValueConverter.getIntegerAttribute(Queue.MAXIMUM_DELIVERY_ATTEMPTS, attributes));
+        }
+        else
+        {
+            setMaximumDeliveryCount(virtualHost.getDefaultMaximumDeliveryAttempts());
+        }
+
+        final String ownerString;
+        switch(_exclusivityPolicy)
+        {
+            case PRINCIPAL:
+                ownerString = ((Principal) _exclusiveOwner).getName();
+                break;
+            case CONTAINER:
+                ownerString = (String) _exclusiveOwner;
+                break;
+            default:
+                ownerString = null;
+
+        }
+
         // Log the creation of this Queue.
         // The priorities display is toggled on if we set priorities > 0
         CurrentActor.get().message(_logSubject,
-                                   QueueMessages.CREATED(String.valueOf(_owner),
+                                   QueueMessages.CREATED(ownerString,
                                                          _entries.getPriorities(),
-                                                         _owner != null,
-                                                         autoDelete,
-                                                         durable, !durable,
+                                                         ownerString != null ,
+                                                         _lifetimePolicy != LifetimePolicy.PERMANENT,
+                                                         durable,
+                                                         !durable,
                                                          _entries.getPriorities() > 0));
 
-        if(arguments != null && arguments.containsKey(Queue.MESSAGE_GROUP_KEY))
+        if(attributes != null && attributes.containsKey(Queue.MESSAGE_GROUP_KEY))
         {
-            if(arguments.get(Queue.MESSAGE_GROUP_SHARED_GROUPS) != null
-               && (Boolean)(arguments.get(Queue.MESSAGE_GROUP_SHARED_GROUPS)))
+            if(attributes.get(Queue.MESSAGE_GROUP_SHARED_GROUPS) != null
+               && (Boolean)(attributes.get(Queue.MESSAGE_GROUP_SHARED_GROUPS)))
             {
-                Object defaultGroup = arguments.get(Queue.MESSAGE_GROUP_DEFAULT_GROUP);
+                Object defaultGroup = attributes.get(Queue.MESSAGE_GROUP_DEFAULT_GROUP);
                 _messageGroupManager =
-                        new DefinedGroupMessageGroupManager<E,Q,L>(String.valueOf(arguments.get(Queue.MESSAGE_GROUP_KEY)),
+                        new DefinedGroupMessageGroupManager<E,Q,L>(String.valueOf(attributes.get(Queue.MESSAGE_GROUP_KEY)),
                                 defaultGroup == null ? DEFAULT_SHARED_MESSAGE_GROUP : defaultGroup.toString(),
                                 this);
             }
             else
             {
-                _messageGroupManager = new AssignedConsumerMessageGroupManager<E,Q,L>(String.valueOf(arguments.get(
+                _messageGroupManager = new AssignedConsumerMessageGroupManager<E,Q,L>(String.valueOf(attributes.get(
                         Queue.MESSAGE_GROUP_KEY)), DEFAULT_MAX_GROUPS);
             }
         }
@@ -256,6 +431,38 @@
 
     }
 
+    private void addLifetimeConstraint(final Deletable<? extends Deletable> lifetimeObject)
+    {
+        final Action<Deletable> deleteQueueTask = new Action<Deletable>()
+        {
+            @Override
+            public void performAction(final Deletable object)
+            {
+                try
+                {
+                    getVirtualHost().removeQueue(SimpleAMQQueue.this);
+                }
+                catch (QpidSecurityException e)
+                {
+                    throw new ConnectionScopedRuntimeException("Unable to delete a queue even though the queue's " +
+                                                               "lifetime was tied to an object being deleted");
+                }
+            }
+        };
+
+        lifetimeObject.addDeleteTask(deleteQueueTask);
+        addDeleteTask(new DeleteDeleteTask(lifetimeObject, deleteQueueTask));
+    }
+
+    private void addExclusivityConstraint(final Deletable<? extends Deletable> lifetimeObject)
+    {
+        final ClearOwnerAction clearOwnerAction = new ClearOwnerAction(lifetimeObject);
+        final DeleteDeleteTask deleteDeleteTask = new DeleteDeleteTask(lifetimeObject, clearOwnerAction);
+        clearOwnerAction.setDeleteTask(deleteDeleteTask);
+        lifetimeObject.addDeleteTask(clearOwnerAction);
+        addDeleteTask(deleteDeleteTask);
+    }
+
     public void resetNotifications()
     {
         // This ensure that the notification checks for the configured alerts are created.
@@ -303,12 +510,7 @@
 
     public boolean isExclusive()
     {
-        return _exclusive;
-    }
-
-    public void setExclusive(boolean exclusive)
-    {
-        _exclusive = exclusive;
+        return _exclusivityPolicy != ExclusivityPolicy.NONE;
     }
 
     public Exchange getAlternateExchange()
@@ -342,27 +544,27 @@
         return _arguments.get(attrName);
     }
 
-    public boolean isAutoDelete()
+    @Override
+    public LifetimePolicy getLifetimePolicy()
     {
-        return _autoDelete;
+        return _lifetimePolicy;
     }
 
     public String getOwner()
     {
-        return _owner;
+        if(_exclusiveOwner != null)
+        {
+            switch(_exclusivityPolicy)
+            {
+                case CONTAINER:
+                    return (String) _exclusiveOwner;
+                case PRINCIPAL:
+                    return ((Principal)_exclusiveOwner).getName();
+            }
+        }
+        return null;
     }
 
-    public AuthorizationHolder getAuthorizationHolder()
-    {
-        return _authorizationHolder;
-    }
-
-    public void setAuthorizationHolder(final AuthorizationHolder authorizationHolder)
-    {
-        _authorizationHolder = authorizationHolder;
-    }
-
-
     public VirtualHost getVirtualHost()
     {
         return _virtualHost;
@@ -381,7 +583,9 @@
                                      final FilterManager filters,
                                      final Class<? extends ServerMessage> messageClass,
                                      final String consumerName,
-                                     EnumSet<Consumer.Option> optionSet) throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive, QpidSecurityException
+                                     EnumSet<Consumer.Option> optionSet)
+            throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive, QpidSecurityException,
+                   ConsumerAccessRefused
     {
 
         // Access control
@@ -396,15 +600,77 @@
             throw new ExistingExclusiveConsumer();
         }
 
+        switch(_exclusivityPolicy)
+        {
+            case CONNECTION:
+                if(_exclusiveOwner == null)
+                {
+                    _exclusiveOwner = target.getSessionModel().getConnectionModel();
+                    addExclusivityConstraint(target.getSessionModel().getConnectionModel());
+                }
+                else
+                {
+                    if(_exclusiveOwner != target.getSessionModel().getConnectionModel())
+                    {
+                        throw new ConsumerAccessRefused();
+                    }
+                }
+                break;
+            case SESSION:
+                if(_exclusiveOwner == null)
+                {
+                    _exclusiveOwner = target.getSessionModel();
+                    addExclusivityConstraint(target.getSessionModel());
+                }
+                else
+                {
+                    if(_exclusiveOwner != target.getSessionModel())
+                    {
+                        throw new ConsumerAccessRefused();
+                    }
+                }
+                break;
+            case LINK:
+                if(getConsumerCount() != 0)
+                {
+                    throw new ConsumerAccessRefused();
+                }
+                break;
+            case PRINCIPAL:
+                if(_exclusiveOwner == null)
+                {
+                    _exclusiveOwner = target.getSessionModel().getConnectionModel().getAuthorizedPrincipal();
+                }
+                else
+                {
+                    if(!_exclusiveOwner.equals(target.getSessionModel().getConnectionModel().getAuthorizedPrincipal()))
+                    {
+                        throw new ConsumerAccessRefused();
+                    }
+                }
+                break;
+            case CONTAINER:
+                if(_exclusiveOwner == null)
+                {
+                    _exclusiveOwner = target.getSessionModel().getConnectionModel().getRemoteContainerName();
+                }
+                else
+                {
+                    if(!_exclusiveOwner.equals(target.getSessionModel().getConnectionModel().getRemoteContainerName()))
+                    {
+                        throw new ConsumerAccessRefused();
+                    }
+                }
+                break;
+            case NONE:
+                break;
+            default:
+                throw new ServerScopedRuntimeException("Unknown exclusivity policy " + _exclusivityPolicy);
+        }
 
         boolean exclusive =  optionSet.contains(Consumer.Option.EXCLUSIVE);
         boolean isTransient =  optionSet.contains(Consumer.Option.TRANSIENT);
 
-        if (exclusive && !isTransient && getConsumerCount() != 0)
-        {
-            throw new ExistingConsumerPreventsExclusive();
-        }
-
         QueueConsumer<T,E,Q,L> consumer = new QueueConsumer<T,E,Q,L>(filters, messageClass,
                                                          optionSet.contains(Consumer.Option.ACQUIRES),
                                                          optionSet.contains(Consumer.Option.SEES_REQUEUES),
@@ -473,11 +739,12 @@
             consumer.close();
             // No longer can the queue have an exclusive consumer
             setExclusiveSubscriber(null);
+
             consumer.setQueueContext(null);
 
-            if(!isDeleted() && isExclusive() && getConsumerCount() == 0)
+            if(_exclusivityPolicy == ExclusivityPolicy.LINK)
             {
-                setAuthorizationHolder(null);
+                _exclusiveOwner = null;
             }
 
             if(_messageGroupManager != null)
@@ -495,8 +762,12 @@
 
             // auto-delete queues must be deleted if there are no remaining subscribers
 
-            if (_autoDelete && getDeleteOnNoConsumers() && !consumer.isTransient() && getConsumerCount() == 0  )
+            if(!consumer.isTransient()
+               && ( _lifetimePolicy == LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS
+                    || _lifetimePolicy == LifetimePolicy.DELETE_ON_NO_LINKS )
+               && getConsumerCount() == 0)
             {
+
                 if (_logger.isInfoEnabled())
                 {
                     _logger.info("Auto-deleting queue:" + this);
@@ -1266,12 +1537,14 @@
                     });
     }
 
-    public void addQueueDeleteTask(final Action<AMQQueue> task)
+    @Override
+    public void addDeleteTask(final Action<? super Q> task)
     {
         _deleteTaskList.add(task);
     }
 
-    public void removeQueueDeleteTask(final Action<AMQQueue> task)
+    @Override
+    public void removeDeleteTask(final Action<? super Q> task)
     {
         _deleteTaskList.remove(task);
     }
@@ -1343,9 +1616,9 @@
             }
 
 
-            for (Action<AMQQueue> task : _deleteTaskList)
+            for (Action<? super Q> task : _deleteTaskList)
             {
-                task.performAction(this);
+                task.performAction((Q)this);
             }
 
             _deleteTaskList.clear();
@@ -1940,6 +2213,26 @@
         return _notificationChecks;
     }
 
+    private static class DeleteDeleteTask implements Action<Deletable>
+    {
+
+        private final Deletable<? extends Deletable> _lifetimeObject;
+        private final Action<? super Deletable> _deleteQueueOwnerTask;
+
+        public DeleteDeleteTask(final Deletable<? extends Deletable> lifetimeObject,
+                                final Action<? super Deletable> deleteQueueOwnerTask)
+        {
+            _lifetimeObject = lifetimeObject;
+            _deleteQueueOwnerTask = deleteQueueOwnerTask;
+        }
+
+        @Override
+        public void performAction(final Deletable object)
+        {
+            _lifetimeObject.removeDeleteTask(_deleteQueueOwnerTask);
+        }
+    }
+
     private final class QueueEntryListener implements StateChangeListener<E, QueueEntry.State>
     {
 
@@ -1990,38 +2283,6 @@
         return ids;
     }
 
-    public AMQSessionModel getExclusiveOwningSession()
-    {
-        return _exclusiveOwner;
-    }
-
-    public void setExclusiveOwningSession(AMQSessionModel exclusiveOwner)
-    {
-        _exclusive = true;
-        _exclusiveOwner = exclusiveOwner;
-    }
-
-
-    public void configure(QueueConfiguration config)
-    {
-        if (config != null)
-        {
-            setMaximumMessageAge(config.getMaximumMessageAge());
-            setMaximumQueueDepth(config.getMaximumQueueDepth());
-            setMaximumMessageSize(config.getMaximumMessageSize());
-            setMaximumMessageCount(config.getMaximumMessageCount());
-            setMinimumAlertRepeatGap(config.getMinimumAlertRepeatGap());
-            setMaximumDeliveryCount(config.getMaxDeliveryCount());
-            _capacity = config.getCapacity();
-            _flowResumeCapacity = config.getFlowResumeCapacity();
-        }
-    }
-
-    public long getMessageDequeueCount()
-    {
-        return  _dequeueCount.get();
-    }
-
     public long getTotalEnqueueSize()
     {
         return _enqueueSize.get();
@@ -2130,20 +2391,13 @@
     @Override
     public void setDescription(String description)
     {
-        if (description == null)
-        {
-            _arguments.remove(Queue.DESCRIPTION);
-        }
-        else
-        {
-            _arguments.put(Queue.DESCRIPTION, description);
-        }
+        _description = description;
     }
 
     @Override
     public String getDescription()
     {
-        return (String) _arguments.get(Queue.DESCRIPTION);
+        return _description;
     }
 
     public final  <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message,
@@ -2176,4 +2430,228 @@
 
     }
 
+    @Override
+    public boolean verifySessionAccess(final AMQSessionModel<?, ?> session)
+    {
+        boolean allowed;
+        switch(_exclusivityPolicy)
+        {
+            case NONE:
+                allowed = true;
+                break;
+            case SESSION:
+                allowed = _exclusiveOwner == null || _exclusiveOwner == session;
+                break;
+            case CONNECTION:
+                allowed = _exclusiveOwner == null || _exclusiveOwner == session.getConnectionModel();
+                break;
+            case PRINCIPAL:
+                allowed = _exclusiveOwner == null || _exclusiveOwner.equals(session.getConnectionModel().getAuthorizedPrincipal());
+                break;
+            case CONTAINER:
+                allowed = _exclusiveOwner == null || _exclusiveOwner.equals(session.getConnectionModel().getRemoteContainerName());
+                break;
+            case LINK:
+                allowed = _exclusiveSubscriber == null || _exclusiveSubscriber.getSessionModel() == session;
+                break;
+            default:
+                throw new ServerScopedRuntimeException("Unknown exclusivity policy " + _exclusivityPolicy);
+        }
+        return allowed;
+    }
+
+    @Override
+    public synchronized void setExclusivityPolicy(final ExclusivityPolicy desiredPolicy)
+            throws ExistingConsumerPreventsExclusive
+    {
+        if(desiredPolicy != _exclusivityPolicy && !(desiredPolicy == null && _exclusivityPolicy == ExclusivityPolicy.NONE))
+        {
+            switch(desiredPolicy)
+            {
+                case NONE:
+                    _exclusiveOwner = null;
+                    break;
+                case PRINCIPAL:
+                    switchToPrincipalExclusivity();
+                    break;
+                case CONTAINER:
+                    switchToContainerExclusivity();
+                    break;
+                case CONNECTION:
+                    switchToConnectionExclusivity();
+                    break;
+                case SESSION:
+                    switchToSessionExclusivity();
+                    break;
+                case LINK:
+                    switchToLinkExclusivity();
+                    break;
+            }
+            _exclusivityPolicy = desiredPolicy;
+        }
+    }
+
+    private void switchToLinkExclusivity() throws ExistingConsumerPreventsExclusive
+    {
+        switch (getConsumerCount())
+        {
+            case 1:
+                _exclusiveSubscriber = getConsumerList().getHead().getConsumer();
+                // deliberate fall through
+            case 0:
+                _exclusiveOwner = null;
+                break;
+            default:
+                throw new ExistingConsumerPreventsExclusive();
+        }
+
+    }
+
+    private void switchToSessionExclusivity() throws ExistingConsumerPreventsExclusive
+    {
+
+        switch(_exclusivityPolicy)
+        {
+            case NONE:
+            case PRINCIPAL:
+            case CONTAINER:
+            case CONNECTION:
+                AMQSessionModel session = null;
+                for(Consumer c : getConsumers())
+                {
+                    if(session == null)
+                    {
+                        session = c.getSessionModel();
+                    }
+                    else if(!session.equals(c.getSessionModel()))
+                    {
+                        throw new ExistingConsumerPreventsExclusive();
+                    }
+                }
+                _exclusiveOwner = session;
+                break;
+            case LINK:
+                _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getConnectionModel();
+        }
+    }
+
+    private void switchToConnectionExclusivity() throws ExistingConsumerPreventsExclusive
+    {
+        switch(_exclusivityPolicy)
+        {
+            case NONE:
+            case CONTAINER:
+            case PRINCIPAL:
+                AMQConnectionModel con = null;
+                for(Consumer c : getConsumers())
+                {
+                    if(con == null)
+                    {
+                        con = c.getSessionModel().getConnectionModel();
+                    }
+                    else if(!con.equals(c.getSessionModel().getConnectionModel()))
+                    {
+                        throw new ExistingConsumerPreventsExclusive();
+                    }
+                }
+                _exclusiveOwner = con;
+                break;
+            case SESSION:
+                _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQSessionModel)_exclusiveOwner).getConnectionModel();
+                break;
+            case LINK:
+                _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getConnectionModel();
+        }
+    }
+
+    private void switchToContainerExclusivity() throws ExistingConsumerPreventsExclusive
+    {
+        switch(_exclusivityPolicy)
+        {
+            case NONE:
+            case PRINCIPAL:
+                String containerID = null;
+                for(Consumer c : getConsumers())
+                {
+                    if(containerID == null)
+                    {
+                        containerID = c.getSessionModel().getConnectionModel().getRemoteContainerName();
+                    }
+                    else if(!containerID.equals(c.getSessionModel().getConnectionModel().getRemoteContainerName()))
+                    {
+                        throw new ExistingConsumerPreventsExclusive();
+                    }
+                }
+                _exclusiveOwner = containerID;
+                break;
+            case CONNECTION:
+                _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQConnectionModel)_exclusiveOwner).getRemoteContainerName();
+                break;
+            case SESSION:
+                _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQSessionModel)_exclusiveOwner).getConnectionModel().getRemoteContainerName();
+                break;
+            case LINK:
+                _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getConnectionModel().getRemoteContainerName();
+        }
+    }
+
+    private void switchToPrincipalExclusivity() throws ExistingConsumerPreventsExclusive
+    {
+        switch(_exclusivityPolicy)
+        {
+            case NONE:
+            case CONTAINER:
+                Principal principal = null;
+                for(Consumer c : getConsumers())
+                {
+                    if(principal == null)
+                    {
+                        principal = c.getSessionModel().getConnectionModel().getAuthorizedPrincipal();
+                    }
+                    else if(!principal.equals(c.getSessionModel().getConnectionModel().getAuthorizedPrincipal()))
+                    {
+                        throw new ExistingConsumerPreventsExclusive();
+                    }
+                }
+                _exclusiveOwner = principal;
+                break;
+            case CONNECTION:
+                _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQConnectionModel)_exclusiveOwner).getAuthorizedPrincipal();
+                break;
+            case SESSION:
+                _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQSessionModel)_exclusiveOwner).getConnectionModel().getAuthorizedPrincipal();
+                break;
+            case LINK:
+                _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getConnectionModel().getAuthorizedPrincipal();
+        }
+    }
+
+    private class ClearOwnerAction implements Action<Deletable>
+    {
+        private final Deletable<? extends Deletable> _lifetimeObject;
+        private DeleteDeleteTask _deleteTask;
+
+        public ClearOwnerAction(final Deletable<? extends Deletable> lifetimeObject)
+        {
+            _lifetimeObject = lifetimeObject;
+        }
+
+        @Override
+        public void performAction(final Deletable object)
+        {
+            if(SimpleAMQQueue.this._exclusiveOwner == _lifetimeObject)
+            {
+                SimpleAMQQueue.this._exclusiveOwner = null;
+            }
+            if(_deleteTask != null)
+            {
+                removeDeleteTask(_deleteTask);
+            }
+        }
+
+        public void setDeleteTask(final DeleteDeleteTask deleteTask)
+        {
+            _deleteTask = deleteTask;
+        }
+    }
 }
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
index f85f947..d1da7fe 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
@@ -21,11 +21,13 @@
 
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.MapValueConverter;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.Map;
-import java.util.UUID;
 
 public class SortedQueue extends OutOfOrderQueue<SortedQueueEntry, SortedQueue, SortedQueueEntryList>
 {
@@ -35,28 +37,26 @@
     private final Object _sortedQueueLock = new Object();
     private final String _sortedPropertyName;
 
-    protected SortedQueue(UUID id, final String name,
-                            final boolean durable, final String owner, final boolean autoDelete,
-                            final boolean exclusive, final VirtualHost virtualHost, Map<String, Object> arguments, String sortedPropertyName)
+    protected SortedQueue(VirtualHost virtualHost,
+                          final AMQSessionModel creatingSession,
+                          Map<String, Object> attributes,
+                          QueueEntryListFactory<SortedQueueEntry, SortedQueue, SortedQueueEntryList> factory)
     {
-        this(id, name, durable, owner, autoDelete, exclusive,
-             virtualHost, arguments, sortedPropertyName, new SortedQueueEntryListFactory(sortedPropertyName));
+        super(virtualHost, creatingSession, attributes, factory);
+        _sortedPropertyName = MapValueConverter.getStringAttribute(Queue.SORT_KEY,attributes);
     }
 
 
-    protected SortedQueue(UUID id, final String name,
-                          final boolean durable, final String owner, final boolean autoDelete,
-                          final boolean exclusive, final VirtualHost virtualHost,
-                          Map<String, Object> arguments,
-                          String sortedPropertyName,
-                          QueueEntryListFactory<SortedQueueEntry,SortedQueue,SortedQueueEntryList> factory)
+    protected SortedQueue(VirtualHost virtualHost,
+                          final AMQSessionModel creatingSession, Map<String, Object> attributes)
     {
-        super(id, name, durable, owner, autoDelete, exclusive,
-              virtualHost, factory, arguments);
-        this._sortedPropertyName = sortedPropertyName;
+        this(virtualHost,
+             creatingSession, attributes,
+             new SortedQueueEntryListFactory(MapValueConverter.getStringAttribute(Queue.SORT_KEY, attributes)));
     }
 
 
+
     public String getSortedPropertyName()
     {
         return _sortedPropertyName;
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java
index e2ccdc4..8a0570e 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java
@@ -20,22 +20,16 @@
  */
 package org.apache.qpid.server.queue;
 
+import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.Map;
-import java.util.UUID;
 
 public class StandardQueue extends SimpleAMQQueue<StandardQueueEntry,StandardQueue,StandardQueueEntryList>
 {
-    public StandardQueue(final UUID id,
-                         final String name,
-                         final boolean durable,
-                         final String owner,
-                         final boolean autoDelete,
-                         final boolean exclusive,
-                         final VirtualHost virtualHost,
-                         final Map<String, Object> arguments)
+    public StandardQueue(final VirtualHost virtualHost,
+                         final AMQSessionModel creatingSession, final Map<String, Object> arguments)
     {
-        super(id, name, durable, owner, autoDelete, exclusive, virtualHost, new StandardQueueEntryList.Factory(), arguments);
+        super(virtualHost, creatingSession, arguments, new StandardQueueEntryList.Factory());
     }
 }
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java b/java/broker-core/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java
index a379f85..b9e0e59 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java
@@ -27,6 +27,7 @@
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.builder.EqualsBuilder;
 import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.model.LifetimePolicy;
 import org.apache.qpid.server.queue.AMQQueue;
 
 /**
@@ -139,8 +140,8 @@
     {
         setName(queue.getName());
 
-        put(Property.AUTO_DELETE, queue.isAutoDelete());
-        put(Property.TEMPORARY, queue.isAutoDelete());
+        put(Property.AUTO_DELETE, queue.getLifetimePolicy() != LifetimePolicy.PERMANENT);
+        put(Property.TEMPORARY, queue.getLifetimePolicy() != LifetimePolicy.PERMANENT);
         put(Property.DURABLE, queue.isDurable());
         put(Property.EXCLUSIVE, queue.isExclusive());
         if (queue.getAlternateExchange() != null)
@@ -151,10 +152,7 @@
         {
             put(Property.OWNER, queue.getOwner());
         }
-        else if (queue.getAuthorizationHolder() != null)
-        {
-            put(Property.OWNER, queue.getAuthorizationHolder().getAuthorizedPrincipal().getName());
-        }
+
     }
 
     public ObjectProperties(Exchange exch, AMQQueue queue, String routingKey)
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java b/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java
index fc5806c..096734f 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java
@@ -42,17 +42,11 @@
     private static final String BINDING = Binding.class.getSimpleName();
     private static final String EXCHANGE = Exchange.class.getSimpleName();
     private static final String QUEUE = Queue.class.getSimpleName();
-    private static final Set<String> QUEUE_ARGUMENTS_EXCLUDES = new HashSet<String>(Arrays.asList(Queue.NAME,
-                                                                                                  Queue.OWNER,
-                                                                                                  Queue.EXCLUSIVE,
-                                                                                                  Queue.ALTERNATE_EXCHANGE));
+    private static final Set<String> QUEUE_ARGUMENTS_EXCLUDES = new HashSet<String>(Arrays.asList(Queue.ALTERNATE_EXCHANGE));
 
     public static void updateQueue(DurableConfigurationStore store, AMQQueue<?,?,?> queue)
     {
         Map<String, Object> attributesMap = new LinkedHashMap<String, Object>();
-        attributesMap.put(Queue.NAME, queue.getName());
-        attributesMap.put(Queue.OWNER, queue.getOwner());
-        attributesMap.put(Queue.EXCLUSIVE, queue.isExclusive());
 
         if (queue.getAlternateExchange() != null)
         {
@@ -75,9 +69,6 @@
     public static void createQueue(DurableConfigurationStore store, AMQQueue<?,?,?> queue)
     {
         Map<String, Object> attributesMap = new HashMap<String, Object>();
-        attributesMap.put(Queue.NAME, queue.getName());
-        attributesMap.put(Queue.OWNER, queue.getOwner());
-        attributesMap.put(Queue.EXCLUSIVE, queue.isExclusive());
         if (queue.getAlternateExchange() != null)
         {
             attributesMap.put(Queue.ALTERNATE_EXCHANGE, queue.getAlternateExchange().getId());
@@ -103,7 +94,7 @@
         Map<String, Object> attributesMap = new HashMap<String, Object>();
         attributesMap.put(Exchange.NAME, exchange.getName());
         attributesMap.put(Exchange.TYPE, exchange.getTypeName());
-        attributesMap.put(Exchange.LIFETIME_POLICY, exchange.isAutoDelete() ? LifetimePolicy.AUTO_DELETE.name()
+        attributesMap.put(Exchange.LIFETIME_POLICY, exchange.isAutoDelete() ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS.name()
                 : LifetimePolicy.PERMANENT.name());
 
         store.create(exchange.getId(), EXCHANGE, attributesMap);
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/util/Deletable.java b/java/broker-core/src/main/java/org/apache/qpid/server/util/Deletable.java
new file mode 100644
index 0000000..a6b16bb
--- /dev/null
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/util/Deletable.java
@@ -0,0 +1,27 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.util;
+
+public interface Deletable<T extends Deletable>
+{
+    void addDeleteTask(Action<? super T> task);
+    void removeDeleteTask(Action<? super T> task);
+}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/util/MapValueConverter.java b/java/broker-core/src/main/java/org/apache/qpid/server/util/MapValueConverter.java
index 37e0177..3543ce3 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/util/MapValueConverter.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/util/MapValueConverter.java
@@ -28,6 +28,7 @@
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 
 public class MapValueConverter
 {
@@ -217,6 +218,13 @@
         return getIntegerAttribute(name, attributes, null);
     }
 
+    public static Long getLongAttribute(String name, Map<String,Object> attributes)
+    {
+        assertMandatoryAttribute(name, attributes);
+        Object obj = attributes.get(name);
+        return toLong(name, obj, null);
+    }
+
     public static Long getLongAttribute(String name, Map<String,Object> attributes, Long defaultValue)
     {
         Object obj = attributes.get(name);
@@ -409,4 +417,41 @@
         return (T) value;
     }
 
+
+    public static UUID getUUIDAttribute(String name, Map<String, Object> attributes)
+    {
+        assertMandatoryAttribute(name, attributes);
+        return getUUIDAttribute(name, attributes, null);
+    }
+
+    public static UUID getUUIDAttribute(String name, Map<String,Object> attributes, UUID defaultVal)
+    {
+        final Object value = attributes.get(name);
+        return toUUID(value, defaultVal);
+    }
+
+    private static UUID toUUID(final Object value, final UUID defaultVal)
+    {
+        if(value == null)
+        {
+            return defaultVal;
+        }
+        else if(value instanceof UUID)
+        {
+            return (UUID)value;
+        }
+        else if(value instanceof String)
+        {
+            return UUID.fromString((String)value);
+        }
+        else if(value instanceof byte[])
+        {
+            return UUID.nameUUIDFromBytes((byte[])value);
+        }
+        else
+        {
+            throw new IllegalArgumentException("Cannot convert " + value.getClass().getName() + " to UUID");
+        }
+    }
+
 }
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index aa70bb3..0e9b879 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -23,6 +23,7 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -34,6 +35,9 @@
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.log4j.Logger;
 import org.apache.qpid.server.exchange.AMQUnknownExchangeType;
+import org.apache.qpid.server.model.ExclusivityPolicy;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.security.QpidSecurityException;
 import org.apache.qpid.server.configuration.ExchangeConfiguration;
 import org.apache.qpid.server.configuration.QueueConfiguration;
@@ -69,7 +73,9 @@
 import org.apache.qpid.server.store.DurableConfiguredObjectRecoverer;
 import org.apache.qpid.server.store.Event;
 import org.apache.qpid.server.store.EventListener;
+import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.txn.DtxRegistry;
+import org.apache.qpid.server.util.MapValueConverter;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
 
 public abstract class AbstractVirtualHost implements VirtualHost, IConnectionRegistry.RegistryChangeListener, EventListener
@@ -529,7 +535,10 @@
             int purged = queue.delete();
 
             getQueueRegistry().unregisterQueue(queue.getName());
-            if (queue.isDurable() && !queue.isAutoDelete())
+            if (queue.isDurable() && !(queue.getLifetimePolicy()
+                                       == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE
+                                       || queue.getLifetimePolicy()
+                                          == LifetimePolicy.DELETE_ON_SESSION_END))
             {
                 DurableConfigurationStore store = getDurableConfigurationStore();
                 DurableConfigurationStoreHelper.removeQueue(store, queue);
@@ -538,26 +547,24 @@
         }
     }
 
-    @Override
-    public AMQQueue createQueue(UUID id,
-                                String queueName,
-                                boolean durable,
-                                String owner,
-                                boolean autoDelete,
-                                boolean exclusive,
-                                boolean deleteOnNoConsumer,
-                                Map<String, Object> arguments) throws QpidSecurityException, QueueExistsException
+    public AMQQueue createQueue(final AMQSessionModel creatingSession, Map<String, Object> attributes) throws QpidSecurityException, QueueExistsException
     {
+        // make a copy as we may augment (with an ID for example)
+        attributes = new LinkedHashMap<String, Object>(attributes);
 
-        if (queueName == null)
-        {
-            throw new IllegalArgumentException("Queue name must not be null");
-        }
+        String queueName = MapValueConverter.getStringAttribute(Queue.NAME, attributes);
+        boolean autoDelete = MapValueConverter.getEnumAttribute(LifetimePolicy.class,
+                                                                Queue.LIFETIME_POLICY,
+                                                                attributes,
+                                                                LifetimePolicy.PERMANENT) != LifetimePolicy.PERMANENT;
+        boolean durable = MapValueConverter.getBooleanAttribute(Queue.DURABLE, attributes, false);
+        ExclusivityPolicy exclusive = MapValueConverter.getEnumAttribute(ExclusivityPolicy.class,Queue.EXCLUSIVE, attributes, ExclusivityPolicy.NONE);
+        String owner = MapValueConverter.getStringAttribute(Queue.OWNER, attributes, null);
 
-                // Access check
+        // Access check
         if (!getSecurityManager().authoriseCreateQueue(autoDelete,
                                                        durable,
-                                                       exclusive,
+                                                       exclusive != null && exclusive != ExclusivityPolicy.NONE,
                                                        null,
                                                        null,
                                                        queueName,
@@ -573,22 +580,27 @@
             {
                 throw new QueueExistsException("Queue with name " + queueName + " already exists", _queueRegistry.getQueue(queueName));
             }
-            if(id == null)
+            if(!attributes.containsKey(Queue.ID))
             {
 
-                id = UUIDGenerator.generateExchangeUUID(queueName, getName());
+                UUID id = UUIDGenerator.generateExchangeUUID(queueName, getName());
                 while(_queueRegistry.getQueue(id) != null)
                 {
                     id = UUID.randomUUID();
                 }
+                attributes.put(Queue.ID, id);
 
             }
-            else if(_queueRegistry.getQueue(id) != null)
+            else if(_queueRegistry.getQueue(MapValueConverter.getUUIDAttribute(Queue.ID, attributes)) != null)
             {
-                throw new QueueExistsException("Queue with id " + id + " already exists", _queueRegistry.getQueue(queueName));
+                throw new QueueExistsException("Queue with id "
+                                               + MapValueConverter.getUUIDAttribute(Queue.ID,
+                                                                                    attributes)
+                                               + " already exists", _queueRegistry.getQueue(queueName));
             }
-            return _queueFactory.createQueue(id, queueName, durable, owner, autoDelete, exclusive, deleteOnNoConsumer,
-                    arguments);
+
+
+            return _queueFactory.createQueue(creatingSession, attributes);
         }
 
     }
@@ -980,13 +992,13 @@
                     // house keeping task from running.
                 }
             }
-            for (AMQConnectionModel connection : getConnectionRegistry().getConnections())
+            for (AMQConnectionModel<?,?> connection : getConnectionRegistry().getConnections())
             {
                 if (_logger.isDebugEnabled())
                 {
                     _logger.debug("Checking for long running open transactions on connection " + connection);
                 }
-                for (AMQSessionModel session : connection.getSessionModels())
+                for (AMQSessionModel<?,?> session : connection.getSessionModels())
                 {
                     if (_logger.isDebugEnabled())
                     {
@@ -1046,5 +1058,54 @@
         {
             return _model;
         }
+
+    }
+
+    @Override
+    public long getDefaultAlertThresholdMessageAge()
+    {
+        return getConfiguration().getMaximumMessageAge();
+    }
+
+    @Override
+    public long getDefaultAlertThresholdMessageSize()
+    {
+        return getConfiguration().getMaximumMessageSize();
+    }
+
+    @Override
+    public long getDefaultAlertThresholdQueueDepthMessages()
+    {
+        return getConfiguration().getMaximumMessageCount();
+    }
+
+    @Override
+    public long getDefaultAlertThresholdQueueDepthBytes()
+    {
+        return getConfiguration().getMaximumQueueDepth();
+    }
+
+    @Override
+    public long getDefaultAlertRepeatGap()
+    {
+        return getConfiguration().getMinimumAlertRepeatGap();
+    }
+
+    @Override
+    public long getDefaultQueueFlowControlSizeBytes()
+    {
+        return getConfiguration().getCapacity();
+    }
+
+    @Override
+    public long getDefaultQueueFlowResumeSizeBytes()
+    {
+        return getConfiguration().getFlowResumeCapacity();
+    }
+
+    @Override
+    public int getDefaultMaximumDeliveryAttempts()
+    {
+        return getConfiguration().getMaxDeliveryCount();
     }
 }
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java
index 12f8c7d..88caf73 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java
@@ -42,6 +42,7 @@
 
 public class DefaultUpgraderProvider implements UpgraderProvider
 {
+    public static final String EXCLUSIVE = "exclusive";
     private final ExchangeRegistry _exchangeRegistry;
     private final VirtualHost _virtualHost;
 
@@ -63,7 +64,8 @@
                 currentUpgrader = addUpgrader(currentUpgrader, new Version1Upgrader());
             case 2:
                 currentUpgrader = addUpgrader(currentUpgrader, new Version2Upgrader());
-
+            case 3:
+                currentUpgrader = addUpgrader(currentUpgrader, new Version3Upgrader());
             case CURRENT_CONFIG_VERSION:
                 currentUpgrader = addUpgrader(currentUpgrader, new NullUpgrader(recoverer));
                 break;
@@ -263,4 +265,49 @@
             }
         }
 
+    /*
+     * Convert the storage of queue attribute exclusive to change exclusive from a boolean to an enum
+     * where exclusive was false it will now be "NONE", and where true it will now be "CONTAINER"
+     * ensure OWNER is null unless the exclusivity policy is CONTAINER
+     */
+    private class Version3Upgrader extends NonNullUpgrader
+    {
+
+        @Override
+        public void configuredObject(UUID id, String type, Map<String, Object> attributes)
+        {
+            if(Queue.class.getSimpleName().equals(type))
+            {
+                Map<String, Object> newAttributes = new LinkedHashMap<String, Object>(attributes);
+                if(attributes.get(EXCLUSIVE) instanceof Boolean)
+                {
+                    boolean isExclusive = (Boolean) attributes.get(EXCLUSIVE);
+                    newAttributes.put(EXCLUSIVE, isExclusive ? "CONTAINER" : "NONE");
+                    if(!isExclusive && attributes.containsKey("owner"))
+                    {
+                        newAttributes.remove("owner");
+                    }
+                }
+                else
+                {
+                    newAttributes.remove("owner");
+                }
+                if(!attributes.containsKey("durable"))
+                {
+                    newAttributes.put("durable","true");
+                }
+                attributes = newAttributes;
+                getUpdateMap().put(id, new ConfiguredObjectRecord(id,type,attributes));
+            }
+
+            getNextUpgrader().configuredObject(id,type,attributes);
+        }
+
+        @Override
+        public void complete()
+        {
+            getNextUpgrader().complete();
+        }
+    }
+
 }
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java
index 59ff1ce..c687cbd 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java
@@ -69,7 +69,7 @@
             String exchangeType = (String) attributeMap.get(org.apache.qpid.server.model.Exchange.TYPE);
             String lifeTimePolicy = (String) attributeMap.get(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY);
             boolean autoDelete = lifeTimePolicy == null
-                                 || LifetimePolicy.valueOf(lifeTimePolicy) == LifetimePolicy.AUTO_DELETE;
+                                 || LifetimePolicy.valueOf(lifeTimePolicy) != LifetimePolicy.PERMANENT;
             try
             {
                 _exchange = _exchangeRegistry.getExchange(id);
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java
index fd836fd..621ea02 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java
@@ -104,13 +104,6 @@
         public AMQQueue resolve()
         {
             String queueName = (String) _attributes.get(Queue.NAME);
-            String owner = (String) _attributes.get(Queue.OWNER);
-            boolean exclusive = (Boolean) _attributes.get(Queue.EXCLUSIVE);
-
-            Map<String, Object> queueArgumentsMap = new LinkedHashMap<String, Object>(_attributes);
-            queueArgumentsMap.remove(Queue.NAME);
-            queueArgumentsMap.remove(Queue.OWNER);
-            queueArgumentsMap.remove(Queue.EXCLUSIVE);
 
             try
             {
@@ -122,8 +115,9 @@
 
                 if (_queue == null)
                 {
-                    _queue = _queueFactory.restoreQueue(_id, queueName, owner, false, exclusive,
-                            false, queueArgumentsMap);
+                    Map<String, Object> attributes = new LinkedHashMap<String, Object>(_attributes);
+                    attributes.put(Queue.ID, _id);
+                    _queue = _queueFactory.restoreQueue(attributes);
                 }
             }
             catch (QpidSecurityException e)
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index 7cd9045..9996684 100755
--- a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -33,6 +33,7 @@
 import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.plugin.ExchangeType;
+import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.LinkRegistry;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.security.QpidSecurityException;
@@ -59,14 +60,7 @@
 
     int removeQueue(AMQQueue queue) throws QpidSecurityException;
 
-    AMQQueue createQueue(UUID id,
-                         String queueName,
-                         boolean durable,
-                         String owner,
-                         boolean autoDelete,
-                         boolean exclusive,
-                         boolean deleteOnNoConsumer,
-                         Map<String, Object> arguments) throws QueueExistsException, QpidSecurityException;
+    AMQQueue createQueue(final AMQSessionModel creatingSession, Map<String, Object> arguments) throws QueueExistsException, QpidSecurityException;
 
 
     Exchange createExchange(UUID id,
@@ -130,4 +124,20 @@
     public void block();
 
     public void unblock();
+
+    long getDefaultAlertThresholdMessageAge();
+
+    long getDefaultAlertThresholdMessageSize();
+
+    long getDefaultAlertThresholdQueueDepthMessages();
+
+    long getDefaultAlertThresholdQueueDepthBytes();
+
+    long getDefaultAlertRepeatGap();
+
+    long getDefaultQueueFlowControlSizeBytes();
+
+    long getDefaultQueueFlowResumeSizeBytes();
+
+    int getDefaultMaximumDeliveryAttempts();
 }
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java
index 96ac92a..2c3420a 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java
@@ -137,9 +137,6 @@
         qConf = new QueueConfiguration("test", vhostConfig);
         assertEquals(2, qConf.getMaximumMessageAge());
 
-        // Check inherited value
-        qConf = new QueueConfiguration("test", _fullHostConf);
-        assertEquals(1, qConf.getMaximumMessageAge());
     }
 
     public void testGetMaximumQueueDepth() throws ConfigurationException
@@ -153,9 +150,6 @@
         qConf = new QueueConfiguration("test", vhostConfig);
         assertEquals(2, qConf.getMaximumQueueDepth());
 
-        // Check inherited value
-        qConf = new QueueConfiguration("test", _fullHostConf);
-        assertEquals(1, qConf.getMaximumQueueDepth());
     }
 
     public void testGetMaximumMessageSize() throws ConfigurationException
@@ -169,9 +163,6 @@
         qConf = new QueueConfiguration("test", vhostConfig);
         assertEquals(2, qConf.getMaximumMessageSize());
 
-        // Check inherited value
-        qConf = new QueueConfiguration("test", _fullHostConf);
-        assertEquals(1, qConf.getMaximumMessageSize());
     }
 
     public void testGetMaximumMessageCount() throws ConfigurationException
@@ -185,22 +176,11 @@
         qConf = new QueueConfiguration("test", vhostConfig);
         assertEquals(2, qConf.getMaximumMessageCount());
 
-        // Check inherited value
-        qConf = new QueueConfiguration("test", _fullHostConf);
-        assertEquals(1, qConf.getMaximumMessageCount());
     }
 
     public void testGetMinimumAlertRepeatGap() throws Exception
     {
-        // set broker attribute ALERT_REPEAT_GAP to 10
-        when(_broker.getAttribute(Broker.QUEUE_ALERT_REPEAT_GAP)).thenReturn(10);
-
-        // check that broker level setting is available on queue configuration
         QueueConfiguration qConf = new QueueConfiguration("test", _emptyConf);
-        assertEquals(10, qConf.getMinimumAlertRepeatGap());
-
-        // remove configuration for ALERT_REPEAT_GAP on broker level
-        when(_broker.getAttribute(Broker.QUEUE_ALERT_REPEAT_GAP)).thenReturn(null);
 
         // Check default value
         qConf = new QueueConfiguration("test", _emptyConf);
@@ -211,9 +191,6 @@
         qConf = new QueueConfiguration("test", vhostConfig);
         assertEquals(2, qConf.getMinimumAlertRepeatGap());
 
-        // Check inherited value
-        qConf = new QueueConfiguration("test", _fullHostConf);
-        assertEquals(1, qConf.getMinimumAlertRepeatGap());
     }
 
     public void testSortQueueConfiguration() throws ConfigurationException
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
index 5a12a41..1358798 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
@@ -35,8 +35,10 @@
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.StateChangeListener;
 
+import java.security.Principal;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
@@ -333,14 +335,26 @@
         }
 
         @Override
-        public int compareTo(AMQSessionModel o)
+        public void close(AMQConstant cause, String message)
         {
-            return getId().compareTo(o.getId());
         }
 
         @Override
-        public void close(AMQConstant cause, String message)
+        public void addDeleteTask(final Action task)
         {
+
+        }
+
+        @Override
+        public void removeDeleteTask(final Action task)
+        {
+
+        }
+
+        @Override
+        public int compareTo(final Object o)
+        {
+            return 0;
         }
     }
 
@@ -431,12 +445,6 @@
         }
 
         @Override
-        public String getUserName()
-        {
-            return null;
-        }
-
-        @Override
         public boolean isSessionNameUnique(byte[] name)
         {
             return false;
@@ -455,6 +463,12 @@
         }
 
         @Override
+        public String getRemoteContainerName()
+        {
+            return null;
+        }
+
+        @Override
         public String getClientVersion()
         {
             return null;
@@ -467,7 +481,7 @@
         }
 
         @Override
-        public String getPrincipalAsString()
+        public Principal getAuthorizedPrincipal()
         {
             return null;
         }
@@ -512,5 +526,17 @@
         {
             return null;
         }
+
+        @Override
+        public void addDeleteTask(final Action task)
+        {
+
+        }
+
+        @Override
+        public void removeDeleteTask(final Action task)
+        {
+
+        }
     }
 }
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
index 573f4a4..e8a2223 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
@@ -20,17 +20,23 @@
  */
 package org.apache.qpid.server.exchange;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+
 import junit.framework.Assert;
 
 import org.apache.qpid.server.binding.Binding;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.security.QpidSecurityException;
 import org.apache.qpid.server.util.BrokerTestHelper;
+import org.apache.qpid.server.virtualhost.QueueExistsException;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.test.utils.QpidTestCase;
 
@@ -70,10 +76,17 @@
         }
     }
 
+    private AMQQueue<?,?,?> createQueue(String name) throws QpidSecurityException, QueueExistsException
+    {
+        Map<String,Object> attributes = new HashMap<String, Object>();
+        attributes.put(Queue.ID, UUIDGenerator.generateRandomUUID());
+        attributes.put(Queue.NAME, name);
+        return _vhost.createQueue(null, attributes);
+    }
+
     public void testNoRoute() throws Exception
     {
-        AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a*#b", false, null, false, false,
-                false, null);
+        AMQQueue<?,?,?> queue = createQueue("a*#b");
         _exchange.registerQueue(new Binding(null, "a.*.#.b",queue, _exchange, null));
 
 
@@ -84,8 +97,7 @@
 
     public void testDirectMatch() throws Exception
     {
-        AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "ab", false, null, false, false,
-                false, null);
+        AMQQueue<?,?,?> queue = createQueue("ab");
         _exchange.registerQueue(new Binding(null, "a.b",queue, _exchange, null));
 
 
@@ -107,7 +119,7 @@
 
     public void testStarMatch() throws Exception
     {
-        AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a*", false, null, false, false, false, null);
+        AMQQueue<?,?,?> queue = createQueue("a*");
         _exchange.registerQueue(new Binding(null, "a.*",queue, _exchange, null));
 
 
@@ -138,7 +150,7 @@
 
     public void testHashMatch() throws Exception
     {
-        AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, false, null);
+        AMQQueue<?,?,?> queue = createQueue("a#");
         _exchange.registerQueue(new Binding(null, "a.#",queue, _exchange, null));
 
 
@@ -189,8 +201,7 @@
 
     public void testMidHash() throws Exception
     {
-        AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false,
-                false, null);
+        AMQQueue<?,?,?> queue = createQueue("a");
         _exchange.registerQueue(new Binding(null, "a.*.#.b",queue, _exchange, null));
 
         routeMessage("a.c.d.b",0l);
@@ -215,8 +226,7 @@
 
     public void testMatchAfterHash() throws Exception
     {
-        AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false,
-                false, null);
+        AMQQueue<?,?,?> queue = createQueue("a#");
         _exchange.registerQueue(new Binding(null, "a.*.#.b.c",queue, _exchange, null));
 
 
@@ -254,8 +264,7 @@
 
     public void testHashAfterHash() throws Exception
     {
-        AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false,
-                false, null);
+        AMQQueue<?,?,?> queue = createQueue("a#");
         _exchange.registerQueue(new Binding(null, "a.*.#.b.c.#.d",queue, _exchange, null));
 
         int queueCount = routeMessage("a.c.b.b.c",0l);
@@ -276,8 +285,7 @@
 
     public void testHashHash() throws Exception
     {
-        AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false,
-                false, null);
+        AMQQueue<?,?,?> queue = createQueue("a#");
         _exchange.registerQueue(new Binding(null, "a.#.*.#.d",queue, _exchange, null));
 
         int queueCount = routeMessage("a.c.b.b.c",0l);
@@ -298,8 +306,7 @@
 
     public void testSubMatchFails() throws Exception
     {
-        AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false,
-                false, null);
+        AMQQueue queue = createQueue("a");
         _exchange.registerQueue(new Binding(null, "a.b.c.d",queue, _exchange, null));
 
         int queueCount = routeMessage("a.b.c",0l);
@@ -328,8 +335,7 @@
 
     public void testMoreRouting() throws Exception
     {
-        AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false,
-                false, null);
+        AMQQueue queue = createQueue("a");
         _exchange.registerQueue(new Binding(null, "a.b",queue, _exchange, null));
 
 
@@ -342,8 +348,7 @@
 
     public void testMoreQueue() throws Exception
     {
-        AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false,
-                false, null);
+        AMQQueue queue = createQueue("a");
         _exchange.registerQueue(new Binding(null, "a.b",queue, _exchange, null));
 
 
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java
index 13e6375..4d38025 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java
@@ -22,6 +22,8 @@
 
 import org.apache.qpid.server.protocol.AMQConnectionModel;
 
+import java.security.Principal;
+
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -43,9 +45,12 @@
     {
         super.setUp();
 
+        final Principal principal = mock(Principal.class);
+        when(principal.getName()).thenReturn(USER);
+
         _connection = mock(AMQConnectionModel.class);
         when(_connection.getConnectionId()).thenReturn(CONNECTION_ID);
-        when(_connection.getPrincipalAsString()).thenReturn(USER);
+        when(_connection.getAuthorizedPrincipal()).thenReturn(principal);
         when(_connection.getRemoteAddressString()).thenReturn("/"+IP_STRING);
         when(_connection.getVirtualHostName()).thenReturn(VHOST);
         _subject = new ConnectionLogSubject(_connection);
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
index 35ffd08..4b6a51f 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
@@ -20,17 +20,12 @@
  */
 package org.apache.qpid.server.queue;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyMap;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.*;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -45,9 +40,10 @@
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.logging.RootMessageLogger;
 import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.model.LifetimePolicy;
 import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.plugin.ExchangeType;
+import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.test.utils.QpidTestCase;
@@ -98,33 +94,19 @@
 
     private void delegateVhostQueueCreation() throws Exception
     {
-        final ArgumentCaptor<UUID> id = ArgumentCaptor.forClass(UUID.class);
-        final ArgumentCaptor<String> queueName = ArgumentCaptor.forClass(String.class);
-        final ArgumentCaptor<Boolean> durable = ArgumentCaptor.forClass(Boolean.class);
-        final ArgumentCaptor<String> owner = ArgumentCaptor.forClass(String.class);
-        final ArgumentCaptor<Boolean> autoDelete = ArgumentCaptor.forClass(Boolean.class);
-        final ArgumentCaptor<Boolean> exclusive = ArgumentCaptor.forClass(Boolean.class);
-        final ArgumentCaptor<Boolean> deleteOnNoConsumer = ArgumentCaptor.forClass(Boolean.class);
-        final ArgumentCaptor<Map> arguments = ArgumentCaptor.forClass(Map.class);
 
-        when(_virtualHost.createQueue(id.capture(), queueName.capture(), durable.capture(), owner.capture(),
-                autoDelete.capture(), exclusive.capture(), deleteOnNoConsumer.capture(), arguments.capture())).then(
+        final ArgumentCaptor<Map> attributes = ArgumentCaptor.forClass(Map.class);
+
+        when(_virtualHost.createQueue(any(AMQSessionModel.class), attributes.capture())).then(
                 new Answer<AMQQueue>()
                 {
                     @Override
                     public AMQQueue answer(InvocationOnMock invocation) throws Throwable
                     {
-                        return _queueFactory.createQueue(id.getValue(),
-                                queueName.getValue(),
-                                durable.getValue(),
-                                owner.getValue(),
-                                autoDelete.getValue(),
-                                exclusive.getValue(),
-                                deleteOnNoConsumer.getValue(),
-                                arguments.getValue());
+                        return _queueFactory.createQueue(null, attributes.getValue());
                     }
                 }
-        );
+            );
     }
 
     private void mockQueueRegistry()
@@ -217,17 +199,14 @@
 
     public void testPriorityQueueRegistration() throws Exception
     {
-        Map<String,Object> attributes = Collections.singletonMap(Queue.PRIORITIES, (Object) 5);
+        Map<String,Object> attributes = new HashMap<String, Object>();
+        attributes.put(Queue.ID, UUID.randomUUID());
+        attributes.put(Queue.NAME, "testPriorityQueue");
+
+        attributes.put(Queue.PRIORITIES, 5);
 
 
-        AMQQueue queue = _queueFactory.createQueue(UUIDGenerator.generateRandomUUID(),
-                "testPriorityQueue",
-                false,
-                "owner",
-                false,
-                false,
-                false,
-                attributes);
+        AMQQueue queue = _queueFactory.createQueue(null, attributes);
 
         assertEquals("Queue not a priority queue", PriorityQueue.class, queue.getClass());
         verifyQueueRegistered("testPriorityQueue");
@@ -240,10 +219,12 @@
         String queueName = getName();
         String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX;
 
-        AMQQueue queue = _queueFactory.createQueue(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false,
-                false,
-                false,
-                null);
+        Map<String,Object> attributes = new HashMap<String, Object>();
+        attributes.put(Queue.ID, UUID.randomUUID());
+        attributes.put(Queue.NAME, queueName);
+
+
+        AMQQueue queue = _queueFactory.createQueue(null, attributes);
         assertEquals("Queue not a simple queue", StandardQueue.class, queue.getClass());
         verifyQueueRegistered(queueName);
 
@@ -261,7 +242,6 @@
      */
     public void testDeadLetterQueueEnabled() throws Exception
     {
-        Map<String,Object> attributes = Collections.singletonMap(Queue.CREATE_DLQ_ON_CREATION, (Object) true);
 
         String queueName = "testDeadLetterQueueEnabled";
         String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX;
@@ -270,14 +250,13 @@
         assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName));
         assertNull("The alternate exchange should not yet exist", _virtualHost.getExchange(dlExchangeName));
 
-        AMQQueue queue = _queueFactory.createQueue(UUIDGenerator.generateRandomUUID(),
-                queueName,
-                false,
-                "owner",
-                false,
-                false,
-                false,
-                attributes);
+        Map<String,Object> attributes = new HashMap<String, Object>();
+
+        attributes.put(Queue.ID, UUID.randomUUID());
+        attributes.put(Queue.NAME, queueName);
+        attributes.put(Queue.CREATE_DLQ_ON_CREATION, true);
+
+        AMQQueue queue = _queueFactory.createQueue(null, attributes);
 
         Exchange altExchange = queue.getAlternateExchange();
         assertNotNull("Queue should have an alternate exchange as DLQ is enabled", altExchange);
@@ -314,14 +293,11 @@
         assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName));
         assertNull("The alternate exchange should not yet exist", _virtualHost.getExchange(dlExchangeName));
 
-        AMQQueue queue = _queueFactory.createQueue(UUIDGenerator.generateRandomUUID(),
-                queueName,
-                false,
-                "owner",
-                false,
-                false,
-                false,
-                null);
+        Map<String,Object> attributes = new HashMap<String, Object>();
+        attributes.put(Queue.ID, UUID.randomUUID());
+        attributes.put(Queue.NAME, queueName);
+
+        AMQQueue queue = _queueFactory.createQueue(null, attributes);
 
         assertEquals("Unexpected maximum delivery count", 5, queue.getMaximumDeliveryCount());
         Exchange altExchange = queue.getAlternateExchange();
@@ -348,7 +324,8 @@
      */
     public void testDeadLetterQueueDisabled() throws Exception
     {
-        Map<String,Object> attributes = Collections.singletonMap(Queue.CREATE_DLQ_ON_CREATION, (Object) false);
+        Map<String,Object> attributes = new HashMap<String, Object>();
+
 
         String queueName = "testDeadLetterQueueDisabled";
         String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX;
@@ -357,14 +334,11 @@
         assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName));
         assertNull("The alternate exchange should not exist", _virtualHost.getExchange(dlExchangeName));
 
-        AMQQueue queue = _queueFactory.createQueue(UUIDGenerator.generateRandomUUID(),
-                queueName,
-                false,
-                "owner",
-                false,
-                false,
-                false,
-                attributes);
+        attributes.put(Queue.ID, UUID.randomUUID());
+        attributes.put(Queue.NAME, queueName);
+        attributes.put(Queue.CREATE_DLQ_ON_CREATION, false);
+
+        AMQQueue queue = _queueFactory.createQueue(null, attributes);
 
         assertNull("Queue should not have an alternate exchange as DLQ is disabled", queue.getAlternateExchange());
         assertNull("The alternate exchange should still not exist", _virtualHost.getExchange(dlExchangeName));
@@ -382,7 +356,6 @@
      */
     public void testDeadLetterQueueNotCreatedForAutodeleteQueues() throws Exception
     {
-        Map<String,Object> attributes = Collections.singletonMap(Queue.CREATE_DLQ_ON_CREATION, (Object) true);
 
         String queueName = "testDeadLetterQueueNotCreatedForAutodeleteQueues";
         String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX;
@@ -391,16 +364,18 @@
         assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName));
         assertNull("The alternate exchange should not exist", _virtualHost.getExchange(dlExchangeName));
 
+        Map<String,Object> attributes = new HashMap<String, Object>();
+        attributes.put(Queue.ID, UUID.randomUUID());
+        attributes.put(Queue.NAME, queueName);
+
+        attributes.put(Queue.CREATE_DLQ_ON_CREATION, true);
+        attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS);
+
         //create an autodelete queue
-        AMQQueue queue = _queueFactory.createQueue(UUIDGenerator.generateRandomUUID(),
-                queueName,
-                false,
-                "owner",
-                true,
-                false,
-                false,
-                attributes);
-        assertTrue("Queue should be autodelete", queue.isAutoDelete());
+        AMQQueue queue = _queueFactory.createQueue(null, attributes);
+        assertEquals("Queue should be autodelete",
+                     LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS,
+                     queue.getLifetimePolicy());
 
         //ensure that the autodelete property overrides the request to enable DLQ
         assertNull("Queue should not have an alternate exchange as queue is autodelete", queue.getAlternateExchange());
@@ -417,16 +392,13 @@
      */
     public void testMaximumDeliveryCount() throws Exception
     {
-        Map<String,Object> attributes = Collections.singletonMap(Queue.MAXIMUM_DELIVERY_ATTEMPTS, (Object) 5);
+        Map<String,Object> attributes = new HashMap<String, Object>();
+        attributes.put(Queue.ID, UUID.randomUUID());
+        attributes.put(Queue.NAME, "testMaximumDeliveryCount");
 
-        final AMQQueue queue = _queueFactory.createQueue(UUIDGenerator.generateRandomUUID(),
-                "testMaximumDeliveryCount",
-                false,
-                "owner",
-                false,
-                false,
-                false,
-                attributes);
+        attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, (Object) 5);
+
+        final AMQQueue queue = _queueFactory.createQueue(null, attributes);
 
         assertNotNull("The queue was not registered as expected ", queue);
         assertEquals("Maximum delivery count not as expected", 5, queue.getMaximumDeliveryCount());
@@ -440,14 +412,11 @@
      */
     public void testMaximumDeliveryCountDefault() throws Exception
     {
-        final AMQQueue queue = _queueFactory.createQueue(UUIDGenerator.generateRandomUUID(),
-                "testMaximumDeliveryCount",
-                false,
-                "owner",
-                false,
-                false,
-                false,
-                null);
+        Map<String,Object> attributes = new HashMap<String, Object>();
+        attributes.put(Queue.ID, UUID.randomUUID());
+        attributes.put(Queue.NAME, "testMaximumDeliveryCountDefault");
+
+        final AMQQueue queue = _queueFactory.createQueue(null, attributes);
 
         assertNotNull("The queue was not registered as expected ", queue);
         assertEquals("Maximum delivery count not as expected", 0, queue.getMaximumDeliveryCount());
@@ -462,15 +431,16 @@
     {
         try
         {
-            _queueFactory.createQueue(UUIDGenerator.generateRandomUUID(), null, false, "owner", true, false,
-                    false,
-                    null);
+            Map<String,Object> attributes = new HashMap<String, Object>();
+            attributes.put(Queue.ID, UUID.randomUUID());
+
+            _queueFactory.createQueue(null, attributes);
             fail("queue with null name can not be created!");
         }
         catch (Exception e)
         {
             assertTrue(e instanceof IllegalArgumentException);
-            assertEquals("Queue name must not be null", e.getMessage());
+            assertEquals("Value for attribute name is not found", e.getMessage());
         }
     }
 
@@ -486,9 +456,14 @@
             // change DLQ name to make its length bigger than exchange name
             setTestSystemProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, "_DLE");
             setTestSystemProperty(BrokerProperties.PROPERTY_DEAD_LETTER_QUEUE_SUFFIX, "_DLQUEUE");
-            Map<String,Object> attributes = Collections.singletonMap(Queue.CREATE_DLQ_ON_CREATION, (Object) true);
-            _queueFactory.createQueue(UUIDGenerator.generateRandomUUID(), queueName, false, "owner",
-                    false, false, false, attributes);
+
+            Map<String,Object> attributes = new HashMap<String, Object>();
+            attributes.put(Queue.ID, UUID.randomUUID());
+            attributes.put(Queue.NAME, queueName);
+
+            attributes.put(Queue.CREATE_DLQ_ON_CREATION, true);
+
+            _queueFactory.createQueue(null, attributes);
             fail("queue with DLQ name having more than 255 characters can not be created!");
         }
         catch (Exception e)
@@ -511,9 +486,14 @@
             // change DLQ name to make its length bigger than exchange name
             setTestSystemProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, "_DLEXCHANGE");
             setTestSystemProperty(BrokerProperties.PROPERTY_DEAD_LETTER_QUEUE_SUFFIX, "_DLQ");
-            Map<String,Object> attributes = Collections.singletonMap(Queue.CREATE_DLQ_ON_CREATION, (Object) true);
-            _queueFactory.createQueue(UUIDGenerator.generateRandomUUID(), queueName, false, "owner",
-                    false, false, false, attributes);
+
+            Map<String,Object> attributes = new HashMap<String, Object>();
+            attributes.put(Queue.ID, UUID.randomUUID());
+            attributes.put(Queue.NAME, queueName);
+
+            attributes.put(Queue.CREATE_DLQ_ON_CREATION, (Object) true);
+
+            _queueFactory.createQueue(null, attributes);
             fail("queue with DLE name having more than 255 characters can not be created!");
         }
         catch (Exception e)
@@ -528,6 +508,7 @@
     {
 
         Map<String,String> arguments = new HashMap<String, String>();
+
         arguments.put(QueueArgumentsConverter.QPID_GROUP_HEADER_KEY,"mykey");
         arguments.put(QueueArgumentsConverter.QPID_SHARED_MSG_GROUP,"1");
 
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java
index c2291f5..42b83d8 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java
@@ -26,9 +26,11 @@
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
-import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.UUID;
 
 public class ConflationQueueListTest extends TestCase
@@ -46,8 +48,11 @@
     protected void setUp() throws Exception
     {
         super.setUp();
-        _queue = new ConflationQueue(UUID.randomUUID(), getName(), false, null, false, false, mock(VirtualHost.class),
-                                     Collections.<String,Object>emptyMap(),CONFLATION_KEY);
+        Map<String,Object> queueAttributes = new HashMap<String, Object>();
+        queueAttributes.put(Queue.ID, UUID.randomUUID());
+        queueAttributes.put(Queue.NAME, getName());
+        queueAttributes.put(Queue.LVQ_KEY, CONFLATION_KEY);
+        _queue = new ConflationQueue(mock(VirtualHost.class), null, queueAttributes);
         _list = _queue.getEntries();
     }
 
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
index c462468..91c0136 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
@@ -27,6 +27,7 @@
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.LifetimePolicy;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.security.AuthorizationHolder;
 import org.apache.qpid.server.consumer.Consumer;
@@ -131,6 +132,12 @@
         return 0;
     }
 
+    @Override
+    public LifetimePolicy getLifetimePolicy()
+    {
+        return null;
+    }
+
     public int getBindingCountHigh()
     {
         return 0;
@@ -216,7 +223,7 @@
     }
 
     @Override
-    public void addQueueDeleteTask(final Action task)
+    public void addDeleteTask(final Action task)
     {
 
     }
@@ -345,7 +352,7 @@
     }
 
     @Override
-    public void removeQueueDeleteTask(final Action task)
+    public void removeDeleteTask(final Action task)
     {
 
     }
@@ -478,6 +485,12 @@
         return false;
     }
 
+    @Override
+    public boolean verifySessionAccess(final AMQSessionModel session)
+    {
+        return false;
+    }
+
     public Exchange getAlternateExchange()
     {
         return null;
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java
index 3db5d0f..0ac4eb8 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java
@@ -26,10 +26,12 @@
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.test.utils.QpidTestCase;
 
-import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.UUID;
 
 public class PriorityQueueListTest extends QpidTestCase
@@ -45,16 +47,11 @@
     protected void setUp()
     {
         QueueEntry[] entries = new QueueEntry[PRIORITIES.length];
-
-        PriorityQueue queue = new PriorityQueue(UUID.randomUUID(),
-                                                getName(),
-                                                false,
-                                                null,
-                                                false,
-                                                false,
-                                                mock(VirtualHost.class),
-                                                Collections.<String,Object>emptyMap(),
-                                                10);
+        Map<String,Object> queueAttributes = new HashMap<String, Object>();
+        queueAttributes.put(Queue.ID, UUID.randomUUID());
+        queueAttributes.put(Queue.NAME, getName());
+        queueAttributes.put(Queue.PRIORITIES, 10);
+        PriorityQueue queue = new PriorityQueue(mock(VirtualHost.class), null, queueAttributes);
         _list = queue.getEntries();
 
         for (int i = 0; i < PRIORITIES.length; i++)
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
index 66e4286..139e324 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
@@ -27,11 +27,13 @@
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.message.MessageInstance.EntryState;
+import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.lang.reflect.Field;
-import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.UUID;
 
 import static org.mockito.Mockito.mock;
@@ -199,8 +201,10 @@
     {
         int numberOfEntries = 5;
         QueueEntryImpl[] entries = new QueueEntryImpl[numberOfEntries];
-        StandardQueue queue = new StandardQueue(UUID.randomUUID(), getName(), false, null, false, false,
-                                                mock(VirtualHost.class), Collections.<String,Object>emptyMap());
+        Map<String,Object> queueAttributes = new HashMap<String, Object>();
+        queueAttributes.put(Queue.ID, UUID.randomUUID());
+        queueAttributes.put(Queue.NAME, getName());
+        StandardQueue queue = new StandardQueue(mock(VirtualHost.class), null, queueAttributes);
         OrderedQueueEntryList queueEntryList = queue.getEntries();
 
         // create test entries
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTestBase.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTestBase.java
index 1934349..956a9fc 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTestBase.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTestBase.java
@@ -29,12 +29,12 @@
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.when;
 
-import java.util.Arrays;
-import java.util.EnumSet;
-import java.util.Map;
+import java.util.*;
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.security.QpidSecurityException;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.server.exchange.DirectExchange;
@@ -51,10 +51,6 @@
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.test.utils.QpidTestCase;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
 abstract class SimpleAMQQueueTestBase<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>> extends QpidTestCase
 {
     private static final Logger _logger = Logger.getLogger(SimpleAMQQueueTestBase.class);
@@ -68,7 +64,7 @@
     private DirectExchange _exchange;
     private MockConsumer _consumerTarget = new MockConsumer();
     private QueueConsumer _consumer;
-    private Map<String,Object> _arguments = null;
+    private Map<String,Object> _arguments = Collections.emptyMap();
 
     @Override
     public void setUp() throws Exception
@@ -78,8 +74,12 @@
 
         _virtualHost = BrokerTestHelper.createVirtualHost(getClass().getName());
 
-        _queue = (Q) _virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), _qname, false, _owner,
-                false, false, false, _arguments);
+        Map<String,Object> attributes = new HashMap<String, Object>(_arguments);
+        attributes.put(Queue.ID, UUIDGenerator.generateRandomUUID());
+        attributes.put(Queue.NAME, _qname);
+        attributes.put(Queue.OWNER, _owner);
+
+        _queue = (Q) _virtualHost.createQueue(null, attributes);
 
         _exchange = (DirectExchange) _virtualHost.getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME);
     }
@@ -104,9 +104,10 @@
         _queue.stop();
         try
         {
-            _queue = (Q) _virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), null,
-                                                                         false, _owner, false,
-                                                                         false, false, _arguments);
+            Map<String,Object> attributes = new HashMap<String, Object>(_arguments);
+            attributes.put(Queue.ID, UUIDGenerator.generateRandomUUID());
+
+            _queue = (Q) _virtualHost.createQueue(null, attributes);
             assertNull("Queue was created", _queue);
         }
         catch (IllegalArgumentException e)
@@ -115,10 +116,10 @@
                             e.getMessage().contains("name"));
         }
 
-        _queue = (Q) _virtualHost.createQueue(UUIDGenerator.generateRandomUUID(),
-                                                                     "differentName", false,
-                                                                     _owner, false,
-                                                                     false, false, _arguments);
+        Map<String,Object> attributes = new HashMap<String, Object>(_arguments);
+        attributes.put(Queue.ID, UUIDGenerator.generateRandomUUID());
+        attributes.put(Queue.NAME, "differentName");
+        _queue = (Q) _virtualHost.createQueue(null, attributes);
         assertNotNull("Queue was not created", _queue);
     }
 
@@ -1137,15 +1138,17 @@
     {
         public NonAsyncDeliverQueue(final TestSimpleQueueEntryListFactory factory, VirtualHost vhost)
         {
-            super(UUIDGenerator.generateRandomUUID(),
-                  "testQueue",
-                  false,
-                  "testOwner",
-                  false,
-                  false,
-                  vhost,
-                  factory,
-                  null);
+            super(vhost, null, attributes(), factory);
+        }
+
+        private static Map<String,Object> attributes()
+        {
+            Map<String,Object> attributes = new HashMap<String, Object>();
+            attributes.put(Queue.ID, UUID.randomUUID());
+            attributes.put(Queue.NAME, "test");
+            attributes.put(Queue.DURABLE, false);
+            attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.PERMANENT);
+            return attributes;
         }
 
         @Override
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java
index c115af5..286c47f 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java
@@ -21,11 +21,15 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.qpid.pool.ReferenceCountingExecutorService;
+import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.util.BrokerTestHelper;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.test.utils.QpidTestCase;
 
+import java.util.HashMap;
+import java.util.Map;
+
 public class SimpleAMQQueueThreadPoolTest extends QpidTestCase
 {
 
@@ -50,8 +54,10 @@
 
         try
         {
-            SimpleAMQQueue queue = (SimpleAMQQueue)
-                    test.createQueue(UUIDGenerator.generateRandomUUID(), "test", false, "owner", false, false, false, null);
+            Map<String,Object> attributes = new HashMap<String, Object>();
+            attributes.put(Queue.ID, UUIDGenerator.generateRandomUUID());
+            attributes.put(Queue.NAME, "test");
+            AMQQueue queue = test.createQueue(null, attributes);
 
             assertFalse("Creation did not start Pool.", ReferenceCountingExecutorService.getInstance().getPool().isShutdown());
 
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
index 9457d59..7aa546e 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
@@ -22,8 +22,11 @@
 
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.UUID;
 
 import static org.mockito.Mockito.mock;
@@ -38,8 +41,10 @@
     public void setUp() throws Exception
     {
         mockLogging();
-
-        StandardQueue queue = new StandardQueue(UUID.randomUUID(), "SimpleQueueEntryImplTest", false, null,false, false, mock(VirtualHost.class),null);
+        Map<String,Object> queueAttributes = new HashMap<String, Object>();
+        queueAttributes.put(Queue.ID, UUID.randomUUID());
+        queueAttributes.put(Queue.NAME, "SimpleQueueEntryImplTest");
+        StandardQueue queue = new StandardQueue(mock(VirtualHost.class), null, queueAttributes);
 
         queueEntryList = queue.getEntries();
 
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
index f8953bb..cfa58f1 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
@@ -26,9 +26,13 @@
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.UUID;
 
 import static org.mockito.Matchers.eq;
@@ -74,8 +78,15 @@
     {
         mockLogging();
 
+        Map<String,Object> attributes = new HashMap<String,Object>();
+        attributes.put(Queue.ID,UUID.randomUUID());
+        attributes.put(Queue.NAME, getName());
+        attributes.put(Queue.DURABLE, false);
+        attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.PERMANENT);
+        attributes.put(Queue.SORT_KEY, "KEY");
+
         // Create test list
-        _testQueue = new SortedQueue(UUID.randomUUID(), getName(), false, null, false,false, mock(VirtualHost.class), null, "KEY", new QueueEntryListFactory<SortedQueueEntry,SortedQueue,SortedQueueEntryList>()
+        _testQueue = new SortedQueue(mock(VirtualHost.class), null, attributes, new QueueEntryListFactory<SortedQueueEntry,SortedQueue,SortedQueueEntryList>()
         {
 
             @Override
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java
index 25d6dbb..e558751 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java
@@ -20,11 +20,15 @@
 package org.apache.qpid.server.queue;
 
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.UUID;
 
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import static org.mockito.Matchers.eq;
@@ -42,7 +46,15 @@
     public void setUp() throws Exception
     {
         mockLogging();
-        SortedQueue queue = new SortedQueue(UUID.randomUUID(), getName(), false, null, false,false, mock(VirtualHost.class), null, "KEY", new QueueEntryListFactory<SortedQueueEntry,SortedQueue,SortedQueueEntryList>()
+
+        Map<String,Object> attributes = new HashMap<String,Object>();
+        attributes.put(Queue.ID,UUID.randomUUID());
+        attributes.put(Queue.NAME, getName());
+        attributes.put(Queue.DURABLE, false);
+        attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.PERMANENT);
+        attributes.put(Queue.SORT_KEY, "KEY");
+
+        SortedQueue queue = new SortedQueue(mock(VirtualHost.class), null, attributes, new QueueEntryListFactory<SortedQueueEntry,SortedQueue,SortedQueueEntryList>()
         {
 
             @Override
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
index 2c697cf..fe9273d 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
@@ -22,9 +22,10 @@
 
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
-import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
@@ -45,8 +46,11 @@
     protected void setUp()
     {
         oldScavengeValue = System.setProperty(SCAVENGE_PROP, "9");
-        _testQueue = new StandardQueue(UUID.randomUUID(),getName(),false,null,false,false,mock(VirtualHost.class),
-                                       Collections.<String,Object>emptyMap());
+
+        Map<String,Object> queueAttributes = new HashMap<String, Object>();
+        queueAttributes.put(Queue.ID, UUID.randomUUID());
+        queueAttributes.put(Queue.NAME, getName());
+        _testQueue = new StandardQueue(mock(VirtualHost.class), null, queueAttributes);
 
         _sqel = _testQueue.getEntries();
         for(int i = 1; i <= 100; i++)
@@ -86,9 +90,10 @@
     {
         if(newList)
         {
-            StandardQueue queue =
-                    new StandardQueue(UUID.randomUUID(), getName(), false, null, false, false, mock(VirtualHost.class),
-                                      Collections.<String, Object>emptyMap());
+            Map<String,Object> queueAttributes = new HashMap<String, Object>();
+            queueAttributes.put(Queue.ID, UUID.randomUUID());
+            queueAttributes.put(Queue.NAME, getName());
+            StandardQueue queue = new StandardQueue(mock(VirtualHost.class), null, queueAttributes);
 
             return queue.getEntries();
         }
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
index c7b812e..77886fa 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
@@ -25,18 +25,21 @@
 import org.apache.qpid.server.consumer.MockConsumer;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.EnumSet;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import static org.mockito.Mockito.mock;
+
 public class StandardQueueTest extends SimpleAMQQueueTestBase<StandardQueueEntry, StandardQueue, StandardQueueEntryList>
 {
 
@@ -44,7 +47,12 @@
     {
         try
         {
-            setQueue(new StandardQueue(UUIDGenerator.generateRandomUUID(), getQname(), false, getOwner(), false,false, null, getArguments()));
+            Map<String,Object> queueAttributes = new HashMap<String, Object>();
+            queueAttributes.put(Queue.ID, UUID.randomUUID());
+            queueAttributes.put(Queue.NAME, "testActiveConsumerCount");
+            queueAttributes.put(Queue.OWNER, "testOwner");
+
+            setQueue(new StandardQueue(null, null, queueAttributes));
             assertNull("Queue was created", getQueue());
         }
         catch (IllegalArgumentException e)
@@ -58,7 +66,13 @@
     public void testAutoDeleteQueue() throws Exception
     {
         getQueue().stop();
-        setQueue(new StandardQueue(UUIDGenerator.generateRandomUUID(), getQname(), false, null, true, false, getVirtualHost(), Collections.<String,Object>emptyMap()));
+        Map<String,Object> queueAttributes = new HashMap<String, Object>();
+        queueAttributes.put(Queue.ID, UUID.randomUUID());
+        queueAttributes.put(Queue.NAME, getQname());
+        queueAttributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS);
+        final StandardQueue queue = new StandardQueue(getVirtualHost(), null, queueAttributes);
+
+        setQueue(queue);
         getQueue().setDeleteOnNoConsumers(true);
 
         ServerMessage message = createMessage(25l);
@@ -75,8 +89,12 @@
 
     public void testActiveConsumerCount() throws Exception
     {
-        final StandardQueue queue = new StandardQueue(UUIDGenerator.generateRandomUUID(), "testActiveConsumerCount", false,
-                                                        "testOwner", false, false, getVirtualHost(), null);
+
+        Map<String,Object> queueAttributes = new HashMap<String, Object>();
+        queueAttributes.put(Queue.ID, UUID.randomUUID());
+        queueAttributes.put(Queue.NAME, "testActiveConsumerCount");
+        queueAttributes.put(Queue.OWNER, "testOwner");
+        final StandardQueue queue = new StandardQueue(getVirtualHost(), null, queueAttributes);
 
         //verify adding an active consumer increases the count
         final MockConsumer consumer1 = new MockConsumer();
@@ -145,8 +163,7 @@
     public void testEnqueueDequeuedEntry() throws Exception
     {
         // create a queue where each even entry is considered a dequeued
-        SimpleAMQQueue queue = new DequeuedQueue(UUIDGenerator.generateRandomUUID(), "test", false,
-                                                  "testOwner", false, false, getVirtualHost(), null);
+        SimpleAMQQueue queue = new DequeuedQueue(getVirtualHost());
         // create a consumer
         MockConsumer consumer = new MockConsumer();
 
@@ -180,9 +197,11 @@
         int messageNumber = 4;
         int dequeueMessageIndex = 1;
 
+        Map<String,Object> queueAttributes = new HashMap<String, Object>();
+        queueAttributes.put(Queue.ID, UUID.randomUUID());
+        queueAttributes.put(Queue.NAME, "test");
         // create queue with overridden method deliverAsync
-        StandardQueue testQueue = new StandardQueue(UUIDGenerator.generateRandomUUID(), "test",
-                                                    false, "testOwner", false, false, getVirtualHost(), null)
+        StandardQueue testQueue = new StandardQueue(getVirtualHost(), null, queueAttributes)
         {
             @Override
             public void deliverAsync(QueueConsumer sub)
@@ -249,16 +268,19 @@
     private static class DequeuedQueue extends SimpleAMQQueue<DequeuedQueueEntry, DequeuedQueue, DequeuedQueueEntryList>
     {
 
-        public DequeuedQueue(final UUID id,
-                             final String queueName,
-                             final boolean durable,
-                             final String owner,
-                             final boolean autoDelete,
-                             final boolean exclusive,
-                             final VirtualHost virtualHost,
-                             final Map<String, Object> arguments)
+        public DequeuedQueue(VirtualHost virtualHost)
         {
-            super(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, new DequeuedQueueEntryListFactory(), arguments);
+            super(virtualHost, null, attributes(), new DequeuedQueueEntryListFactory());
+        }
+
+        private static Map<String,Object> attributes()
+        {
+            Map<String,Object> attributes = new HashMap<String, Object>();
+            attributes.put(Queue.ID, UUID.randomUUID());
+            attributes.put(Queue.NAME, "test");
+            attributes.put(Queue.DURABLE, false);
+            attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.PERMANENT);
+            return attributes;
         }
     }
     private static class DequeuedQueueEntryListFactory implements QueueEntryListFactory<DequeuedQueueEntry, DequeuedQueue, DequeuedQueueEntryList>
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java b/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
index 8b66e7d..c5c68ec 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
@@ -32,6 +32,7 @@
 
 import java.io.File;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.UUID;
 
@@ -40,6 +41,7 @@
 import org.apache.qpid.server.binding.Binding;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.model.ExclusivityPolicy;
 import org.apache.qpid.server.model.LifetimePolicy;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.UUIDGenerator;
@@ -143,7 +145,7 @@
         verify(_recoveryHandler).configuredObject(eq(_exchangeId), eq(EXCHANGE),
                 eq(map( org.apache.qpid.server.model.Exchange.NAME, getName(),
                         org.apache.qpid.server.model.Exchange.TYPE, getName()+"Type",
-                        org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, LifetimePolicy.AUTO_DELETE.toString())));
+                        org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS.name())));
     }
 
     private Map<String,Object> map(Object... vals)
@@ -220,7 +222,7 @@
         Map<String, Object> queueAttributes = new HashMap<String, Object>();
         queueAttributes.put(Queue.NAME, getName());
         queueAttributes.put(Queue.OWNER, getName()+"Owner");
-        queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE);
+        queueAttributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER.name());
         verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes));
     }
 
@@ -240,7 +242,7 @@
 
         queueAttributes.put(Queue.NAME, getName());
         queueAttributes.put(Queue.OWNER, getName()+"Owner");
-        queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE);
+        queueAttributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER.name());
         queueAttributes.putAll(attributes);
 
         verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes));
@@ -258,7 +260,7 @@
         Map<String, Object> queueAttributes = new HashMap<String, Object>();
         queueAttributes.put(Queue.NAME, getName());
         queueAttributes.put(Queue.OWNER, getName()+"Owner");
-        queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE);
+        queueAttributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER.name());
         queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString());
 
         verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes));
@@ -292,8 +294,6 @@
         Map<String,Object> queueAttributes = new HashMap<String, Object>();
 
         queueAttributes.put(Queue.NAME, getName());
-        queueAttributes.put(Queue.OWNER, getName()+"Owner");
-        queueAttributes.put(Queue.EXCLUSIVE, Boolean.FALSE);
         queueAttributes.putAll(attributes);
 
         verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes));
@@ -320,8 +320,6 @@
         Map<String,Object> queueAttributes = new HashMap<String, Object>();
 
         queueAttributes.put(Queue.NAME, getName());
-        queueAttributes.put(Queue.OWNER, getName()+"Owner");
-        queueAttributes.put(Queue.EXCLUSIVE, Boolean.FALSE);
         queueAttributes.putAll(attributes);
         queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString());
 
@@ -361,13 +359,19 @@
     {
         AMQQueue queue = mock(AMQQueue.class);
         when(queue.getName()).thenReturn(queueName);
-        when(queue.getOwner()).thenReturn(queueOwner);
         when(queue.isExclusive()).thenReturn(exclusive);
         when(queue.getId()).thenReturn(_queueId);
         when(queue.getAlternateExchange()).thenReturn(alternateExchange);
-        if(arguments != null && !arguments.isEmpty())
+        final Map<String,Object> attributes = arguments == null ? new LinkedHashMap<String, Object>() : new LinkedHashMap<String, Object>(arguments);
+        attributes.put(Queue.NAME, queueName);
+        if(exclusive)
         {
-            when(queue.getAvailableAttributes()).thenReturn(arguments.keySet());
+            when(queue.getOwner()).thenReturn(queueOwner);
+
+            attributes.put(Queue.OWNER, queueOwner);
+            attributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER);
+        }
+            when(queue.getAvailableAttributes()).thenReturn(attributes.keySet());
             final ArgumentCaptor<String> requestedAttribute = ArgumentCaptor.forClass(String.class);
             when(queue.getAttribute(requestedAttribute.capture())).then(
                     new Answer()
@@ -377,10 +381,9 @@
                         public Object answer(final InvocationOnMock invocation) throws Throwable
                         {
                             String attrName = requestedAttribute.getValue();
-                            return arguments.get(attrName);
+                            return attributes.get(attrName);
                         }
                     });
-        }
 
         return queue;
     }
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java b/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
index fb63fef..f082d58 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
@@ -25,11 +25,13 @@
 import static org.mockito.Mockito.when;
 
 import java.net.SocketAddress;
-import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.UUID;
 
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
@@ -182,8 +184,10 @@
     public static AMQQueue createQueue(String queueName, VirtualHost virtualHost)
             throws QpidSecurityException, QueueExistsException
     {
-        AMQQueue queue = virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), queueName, false, null,
-                false, false, false, Collections.<String, Object>emptyMap());
+        Map<String,Object> attributes = new HashMap<String, Object>();
+        attributes.put(Queue.ID, UUIDGenerator.generateRandomUUID());
+        attributes.put(Queue.NAME, queueName);
+        AMQQueue queue = virtualHost.createQueue(null, attributes);
         return queue;
     }
 
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
index 223e2c5..1e25aac 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
@@ -46,6 +46,7 @@
 import org.apache.qpid.server.store.DurableConfigurationRecoverer;
 import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.store.DurableConfiguredObjectRecoverer;
+import org.apache.qpid.server.util.MapValueConverter;
 import org.apache.qpid.test.utils.QpidTestCase;
 import org.mockito.ArgumentCaptor;
 import org.mockito.invocation.InvocationOnMock;
@@ -117,14 +118,11 @@
 
 
 
-        final ArgumentCaptor<UUID> idArg = ArgumentCaptor.forClass(UUID.class);
-        final ArgumentCaptor<String> queueArg = ArgumentCaptor.forClass(String.class);
-        final ArgumentCaptor<Map> argsArg = ArgumentCaptor.forClass(Map.class);
+        final ArgumentCaptor<Map> attributesArg = ArgumentCaptor.forClass(Map.class);
 
         _queueFactory = mock(QueueFactory.class);
 
-        when(_queueFactory.restoreQueue(idArg.capture(), queueArg.capture(),
-                anyString(), anyBoolean(), anyBoolean(), anyBoolean(), argsArg.capture())).then(
+        when(_queueFactory.restoreQueue(attributesArg.capture())).then(
                 new Answer()
                 {
 
@@ -133,8 +131,9 @@
                     {
                         final AMQQueue queue = mock(AMQQueue.class);
 
-                        final String queueName = queueArg.getValue();
-                        final UUID queueId = idArg.getValue();
+                        final Map attributes = attributesArg.getValue();
+                        final String queueName = (String) attributes.get(Queue.NAME);
+                        final UUID queueId = MapValueConverter.getUUIDAttribute(Queue.ID, attributes);
 
                         when(queue.getName()).thenReturn(queueName);
                         when(queue.getId()).thenReturn(queueId);
@@ -153,10 +152,10 @@
                                         return null;
                                     }
                                 }
-                        ).when(queue).setAlternateExchange(altExchangeArg.capture());
+                                ).when(queue).setAlternateExchange(altExchangeArg.capture());
 
-                        Map args = argsArg.getValue();
-                        if(args.containsKey(Queue.ALTERNATE_EXCHANGE))
+                        Map args = attributes;
+                        if (args.containsKey(Queue.ALTERNATE_EXCHANGE))
                         {
                             final UUID exchangeId = UUID.fromString(args.get(Queue.ALTERNATE_EXCHANGE).toString());
                             final Exchange exchange = _exchangeRegistry.getExchange(exchangeId);
@@ -470,7 +469,6 @@
         {
             queue.put(Queue.ALTERNATE_EXCHANGE, alternateExchangeId.toString());
         }
-        queue.put(Queue.EXCLUSIVE, false);
 
         return queue;
 
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
index f7eecc7..cee7edf 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
@@ -29,6 +29,7 @@
 import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.plugin.ExchangeType;
+import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.LinkRegistry;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueRegistry;
@@ -152,14 +153,7 @@
     }
 
     @Override
-    public AMQQueue createQueue(UUID id,
-                                String queueName,
-                                boolean durable,
-                                String owner,
-                                boolean autoDelete,
-                                boolean exclusive,
-                                boolean deleteOnNoConsumer,
-                                Map<String, Object> arguments)
+    public AMQQueue createQueue(final AMQSessionModel creatingSession, Map<String, Object> arguments)
     {
         return null;
     }
@@ -314,4 +308,52 @@
     public void unblock()
     {
     }
+
+    @Override
+    public long getDefaultAlertThresholdMessageAge()
+    {
+        return 0;
+    }
+
+    @Override
+    public long getDefaultAlertThresholdMessageSize()
+    {
+        return 0;
+    }
+
+    @Override
+    public long getDefaultAlertThresholdQueueDepthMessages()
+    {
+        return 0;
+    }
+
+    @Override
+    public long getDefaultAlertThresholdQueueDepthBytes()
+    {
+        return 0;
+    }
+
+    @Override
+    public long getDefaultAlertRepeatGap()
+    {
+        return 0;
+    }
+
+    @Override
+    public long getDefaultQueueFlowControlSizeBytes()
+    {
+        return 0;
+    }
+
+    @Override
+    public long getDefaultQueueFlowResumeSizeBytes()
+    {
+        return 0;
+    }
+
+    @Override
+    public int getDefaultMaximumDeliveryAttempts()
+    {
+        return 0;
+    }
 }
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
index ff7ce0a..9c012eb 100644
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
@@ -25,6 +25,7 @@
 import java.text.MessageFormat;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.security.auth.Subject;
@@ -42,6 +43,7 @@
 import org.apache.qpid.server.security.AuthorizationHolder;
 import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
 import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.transport.Connection;
@@ -56,7 +58,8 @@
 import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT;
 import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.USER_FORMAT;
 
-public class ServerConnection extends Connection implements AMQConnectionModel, LogSubject, AuthorizationHolder
+public class ServerConnection extends Connection implements AMQConnectionModel<ServerConnection, ServerSession>,
+                                                            LogSubject, AuthorizationHolder
 {
     private Runnable _onOpenTask;
     private AtomicBoolean _logClosed = new AtomicBoolean(false);
@@ -72,6 +75,10 @@
     private AtomicLong _lastIoTime = new AtomicLong();
     private boolean _blocking;
     private Transport _transport;
+
+    private final CopyOnWriteArrayList<Action<? super ServerConnection>> _taskList =
+            new CopyOnWriteArrayList<Action<? super ServerConnection>>();
+
     private volatile boolean _stopped;
 
     public ServerConnection(final long connectionId, Broker broker)
@@ -197,7 +204,7 @@
         _onOpenTask = task;
     }
 
-    public void closeSession(AMQSessionModel session, AMQConstant cause, String message)
+    public void closeSession(ServerSession session, AMQConstant cause, String message)
     {
         ExecutionException ex = new ExecutionException();
         ExecutionErrorCode code = ExecutionErrorCode.INTERNAL_ERROR;
@@ -211,7 +218,7 @@
         }
         ex.setErrorCode(code);
         ex.setDescription(message);
-        ((ServerSession)session).invoke(ex);
+        session.invoke(ex);
 
         session.close(cause, message);
     }
@@ -315,6 +322,7 @@
     public void close(AMQConstant cause, String message)
     {
         closeSubscriptions();
+        performDeleteTasks();
         ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL;
         try
         {
@@ -327,6 +335,14 @@
         close(replyCode, message);
     }
 
+    protected void performDeleteTasks()
+    {
+        for(Action<? super ServerConnection> task : _taskList)
+        {
+            task.performAction(this);
+        }
+    }
+
     public synchronized void block()
     {
         if(!_blocking)
@@ -367,12 +383,12 @@
         super.removeSession(ssn);
     }
 
-    public List<AMQSessionModel> getSessionModels()
+    public List<ServerSession> getSessionModels()
     {
-        List<AMQSessionModel> sessions = new ArrayList<AMQSessionModel>();
+        List<ServerSession> sessions = new ArrayList<ServerSession>();
         for (Session ssn : getChannels())
         {
-            sessions.add((AMQSessionModel) ssn);
+            sessions.add((ServerSession) ssn);
         }
         return sessions;
     }
@@ -475,14 +491,10 @@
         return String.valueOf(getRemoteAddress());
     }
 
-    public String getUserName()
-    {
-        return _authorizedPrincipal.getName();
-    }
-
     @Override
     public void closed()
     {
+        performDeleteTasks();
         closeSubscriptions();
         super.closed();
     }
@@ -522,6 +534,12 @@
     }
 
     @Override
+    public String getRemoteContainerName()
+    {
+        return getConnectionDelegate().getClientId();
+    }
+
+    @Override
     public String getClientVersion()
     {
         return getConnectionDelegate().getClientVersion();
@@ -533,11 +551,6 @@
         return getConnectionDelegate().getClientProduct();
     }
 
-    public String getPrincipalAsString()
-    {
-        return getAuthorizedPrincipal() == null ? null : getAuthorizedPrincipal().getName();
-    }
-
     public long getSessionCountLimit()
     {
         return getChannelMax();
@@ -565,4 +578,16 @@
         super.doHeartBeat();
 
     }
+
+    @Override
+    public void addDeleteTask(final Action<? super ServerConnection> task)
+    {
+        _taskList.add(task);
+    }
+
+    @Override
+    public void removeDeleteTask(final Action<? super ServerConnection> task)
+    {
+        _taskList.remove(task);
+    }
 }
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
index c85a415..dc26249 100644
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.protocol.v0_10;
 
+import java.security.Principal;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -310,14 +311,18 @@
     private boolean isSessionNameUnique(final byte[] name, final Connection conn)
     {
         final ServerConnection sconn = (ServerConnection) conn;
-        final String userId = sconn.getUserName();
+        final Principal authorizedPrincipal = sconn.getAuthorizedPrincipal();
+        final String userId = authorizedPrincipal == null ? "" : authorizedPrincipal.getName();
 
         final Iterator<AMQConnectionModel> connections =
                         ((ServerConnection)conn).getVirtualHost().getConnectionRegistry().getConnections().iterator();
         while(connections.hasNext())
         {
-            final AMQConnectionModel amqConnectionModel = (AMQConnectionModel) connections.next();
-            if (userId.equals(amqConnectionModel.getUserName()) && !amqConnectionModel.isSessionNameUnique(name))
+            final AMQConnectionModel amqConnectionModel = connections.next();
+            final String userName = amqConnectionModel.getAuthorizedPrincipal() == null
+                    ? ""
+                    : amqConnectionModel.getAuthorizedPrincipal().getName();
+            if (userId.equals(userName) && !amqConnectionModel.isSessionNameUnique(name))
             {
                 return false;
             }
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index 0e6b4d3..29f9fc5 100644
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -78,6 +78,7 @@
 import org.apache.qpid.server.txn.TimeoutDtxException;
 import org.apache.qpid.server.txn.UnknownDtxBranchException;
 import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.Deletable;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.transport.*;
 import org.slf4j.Logger;
@@ -88,7 +89,9 @@
 
 public class ServerSession extends Session
         implements AuthorizationHolder,
-                   AMQSessionModel, LogSubject, AsyncAutoCommitTransaction.FutureRecorder
+                   AMQSessionModel<ServerSession,ServerConnection>, LogSubject, AsyncAutoCommitTransaction.FutureRecorder,
+                   Deletable<ServerSession>
+
 {
     private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class);
 
@@ -132,7 +135,7 @@
 
     private Map<String, ConsumerTarget_0_10> _subscriptions = new ConcurrentHashMap<String, ConsumerTarget_0_10>();
 
-    private final List<Action<ServerSession>> _taskList = new CopyOnWriteArrayList<Action<ServerSession>>();
+    private final List<Action<? super ServerSession>> _taskList = new CopyOnWriteArrayList<Action<? super ServerSession>>();
 
     private final TransactionTimeoutHelper _transactionTimeoutHelper;
 
@@ -374,7 +377,7 @@
         }
         _messageDispositionListenerMap.clear();
 
-        for (Action<ServerSession> task : _taskList)
+        for (Action<? super ServerSession> task : _taskList)
         {
             task.performAction(this);
         }
@@ -610,12 +613,12 @@
         return getConnection().getAuthorizedSubject();
     }
 
-    public void addSessionCloseTask(Action<ServerSession> task)
+    public void addDeleteTask(Action<? super ServerSession> task)
     {
         _taskList.add(task);
     }
 
-    public void removeSessionCloseTask(Action<ServerSession> task)
+    public void removeDeleteTask(Action<? super ServerSession> task)
     {
         _taskList.remove(task);
     }
@@ -652,7 +655,7 @@
         return _id;
     }
 
-    public AMQConnectionModel getConnectionModel()
+    public ServerConnection getConnectionModel()
     {
         return getConnection();
     }
@@ -922,7 +925,7 @@
     }
 
     @Override
-    public int compareTo(AMQSessionModel o)
+    public int compareTo(ServerSession o)
     {
         return getId().compareTo(o.getId());
     }
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index 1bd5053..b0a60be 100644
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -25,6 +25,8 @@
 import java.util.UUID;
 import org.apache.log4j.Logger;
 
+import org.apache.qpid.server.model.ExclusivityPolicy;
+import org.apache.qpid.server.model.LifetimePolicy;
 import org.apache.qpid.server.store.StoreException;
 import org.apache.qpid.server.exchange.AMQUnknownExchangeType;
 import org.apache.qpid.server.exchange.Exchange;
@@ -204,47 +206,12 @@
                 {
                     exception(session,method,ExecutionErrorCode.NOT_FOUND, "Queue: " + queueName + " not found");
                 }
-                else if(queue.getAuthorizationHolder() != null && queue.getAuthorizationHolder() != session)
-                {
-                    exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
-                }
-                else if(queue.isExclusive() && queue.getExclusiveOwningSession() != null && queue.getExclusiveOwningSession() != session)
+                else if(!queue.verifySessionAccess((ServerSession)session))
                 {
                     exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
                 }
                 else
                 {
-                    if(queue.isExclusive())
-                    {
-                        ServerSession s = (ServerSession) session;
-                        queue.setExclusiveOwningSession(s);
-
-                        ((ServerSession) session).addSessionCloseTask(new Action<ServerSession>()
-                        {
-                            public void performAction(ServerSession session)
-                            {
-                                if(queue.getExclusiveOwningSession() == session)
-                                {
-                                    queue.setExclusiveOwningSession(null);
-                                }
-                            }
-                        });
-
-                        if(queue.getAuthorizationHolder() == null)
-                        {
-                            queue.setAuthorizationHolder(s);
-                            ((ServerSession) session).addSessionCloseTask(new Action<ServerSession>()
-                            {
-                                public void performAction(ServerSession session)
-                                {
-                                    if(queue.getAuthorizationHolder() == session)
-                                    {
-                                        queue.setAuthorizationHolder(null);
-                                    }
-                                }
-                            });
-                        }
-                    }
 
                     FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L);
 
@@ -302,6 +269,10 @@
                     {
                         exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage());
                     }
+                    catch (MessageSource.ConsumerAccessRefused consumerAccessRefused)
+                    {
+                        exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an incompatible exclusivity policy");
+                    }
                 }
             }
         }
@@ -1197,7 +1168,7 @@
                 exception(session, method, errorCode, description);
 
             }
-            else if (exclusive && (queue.getExclusiveOwningSession() != null && !queue.getExclusiveOwningSession().equals(session)))
+            else if (!queue.verifySessionAccess((ServerSession)session))
             {
                 String description = "Cannot declare queue('" + queueName + "'),"
                                                                        + " as exclusive queue with same name "
@@ -1214,7 +1185,6 @@
             try
             {
 
-                String owner = method.getExclusive() ? ((ServerSession)session).getClientID() : null;
                 final String alternateExchangeName = method.getAlternateExchange();
 
 
@@ -1227,66 +1197,36 @@
 
                 final UUID id = UUIDGenerator.generateQueueUUID(queueName, virtualHost.getName());
 
-                final boolean deleteOnNoConsumer = !exclusive && autoDelete;
+                arguments.put(Queue.ID, id);
+                arguments.put(Queue.NAME, queueName);
 
-                queue = virtualHost.createQueue(id, queueName, method.getDurable(), owner,
-                                                autoDelete, exclusive, deleteOnNoConsumer,
-                                                arguments);
+                LifetimePolicy lifetime;
+                if(autoDelete)
+                {
+                    lifetime = exclusive ? LifetimePolicy.DELETE_ON_SESSION_END
+                            : LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS;
+                }
+                else
+                {
+                    lifetime = LifetimePolicy.PERMANENT;
+                }
 
-                if (autoDelete && exclusive)
-                {
-                    final AMQQueue q = queue;
-                    final Action<ServerSession> deleteQueueTask = new Action<ServerSession>()
-                        {
-                            public void performAction(ServerSession session)
-                            {
-                                try
-                                {
-                                    virtualHost.removeQueue(q);
-                                }
-                                catch (QpidSecurityException e)
-                                {
-                                    exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage());
-                                }
-                            }
-                        };
-                    final ServerSession s = (ServerSession) session;
-                    s.addSessionCloseTask(deleteQueueTask);
-                    queue.addQueueDeleteTask(new Action<AMQQueue>()
-                        {
-                            public void performAction(AMQQueue queue)
-                            {
-                                s.removeSessionCloseTask(deleteQueueTask);
-                            }
-                        });
-                }
-                if (exclusive)
-                {
-                    final AMQQueue q = queue;
-                    final Action<ServerSession> removeExclusive = new Action<ServerSession>()
-                    {
-                        public void performAction(ServerSession session)
-                        {
-                            q.setAuthorizationHolder(null);
-                            q.setExclusiveOwningSession(null);
-                        }
-                    };
-                    final ServerSession s = (ServerSession) session;
-                    q.setExclusiveOwningSession(s);
-                    s.addSessionCloseTask(removeExclusive);
-                    queue.addQueueDeleteTask(new Action<AMQQueue>()
-                    {
-                        public void performAction(AMQQueue queue)
-                        {
-                            s.removeSessionCloseTask(removeExclusive);
-                        }
-                    });
-                }
+                arguments.put(Queue.LIFETIME_POLICY, lifetime);
+
+                ExclusivityPolicy exclusivityPolicy = exclusive ? ExclusivityPolicy.SESSION : ExclusivityPolicy.NONE;
+
+
+                arguments.put(Queue.DURABLE, method.getDurable());
+
+                arguments.put(Queue.EXCLUSIVE, exclusivityPolicy);
+
+                queue = virtualHost.createQueue((ServerSession)session, arguments);
+
             }
             catch(QueueExistsException qe)
             {
                 queue = qe.getExistingQueue();
-                if (exclusive && (queue.getExclusiveOwningSession() != null && !queue.getExclusiveOwningSession().equals(session)))
+                if (!queue.verifySessionAccess((ServerSession)session))
                 {
                     String description = "Cannot declare queue('" + queueName + "'),"
                                                                            + " as exclusive queue with same name "
@@ -1347,11 +1287,7 @@
             }
             else
             {
-                if(queue.getAuthorizationHolder() != null && queue.getAuthorizationHolder() != session)
-                {
-                    exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
-                }
-                else if(queue.isExclusive() && queue.getExclusiveOwningSession()  != null && queue.getExclusiveOwningSession() != session)
+                if(!queue.verifySessionAccess((ServerSession)session))
                 {
                     exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
                 }
@@ -1424,7 +1360,7 @@
             result.setQueue(queue.getName());
             result.setDurable(queue.isDurable());
             result.setExclusive(queue.isExclusive());
-            result.setAutoDelete(queue.isAutoDelete());
+            result.setAutoDelete(queue.getLifetimePolicy() != LifetimePolicy.PERMANENT);
             Map<String, Object> arguments = new LinkedHashMap<String, Object>();
             Collection<String> availableAttrs = queue.getAvailableAttributes();
 
@@ -1500,7 +1436,6 @@
     public void closed(Session session)
     {
         setThreadSubject(session);
-
         ServerSession serverSession = (ServerSession)session;
 
         serverSession.stopSubscriptions();
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index b15b3f0..fe1cb62 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -22,6 +22,7 @@
 
 import java.nio.ByteBuffer;
 import java.util.*;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
@@ -30,6 +31,8 @@
 import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.filter.AMQInvalidArgumentException;
+import org.apache.qpid.server.filter.Filterable;
+import org.apache.qpid.server.filter.MessageFilter;
 import org.apache.qpid.server.security.QpidSecurityException;
 import org.apache.qpid.common.AMQPFilterTypes;
 import org.apache.qpid.framing.AMQMethodBody;
@@ -86,7 +89,9 @@
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.transport.TransportException;
 
-public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.FutureRecorder
+public class AMQChannel<T extends AMQProtocolSession<T>>
+        implements AMQSessionModel<AMQChannel<T>,T>,
+                   AsyncAutoCommitTransaction.FutureRecorder
 {
     public static final int DEFAULT_PREFETCH = 4096;
 
@@ -140,7 +145,7 @@
     private final AtomicLong _txnRejects = new AtomicLong(0);
     private final AtomicLong _txnCount = new AtomicLong(0);
 
-    private final AMQProtocolSession _session;
+    private final T _session;
     private AtomicBoolean _closing = new AtomicBoolean(false);
 
     private final Set<Object> _blockingEntities = Collections.synchronizedSet(new HashSet<Object>());
@@ -163,12 +168,15 @@
     private final TransactionTimeoutHelper _transactionTimeoutHelper;
     private final UUID _id = UUID.randomUUID();
 
+    private final List<Action<? super AMQChannel<T>>> _taskList =
+            new CopyOnWriteArrayList<Action<? super AMQChannel<T>>>();
+
 
     private final CapacityCheckAction _capacityCheckAction = new CapacityCheckAction();
     private final ImmediateAction _immediateAction = new ImmediateAction();
 
 
-    public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore)
+    public AMQChannel(T session, int channelId, MessageStore messageStore)
             throws AMQException
     {
         _session = session;
@@ -526,7 +534,8 @@
     public AMQShortString consumeFromSource(AMQShortString tag, MessageSource source, boolean acks,
                                             FieldTable filters, boolean exclusive, boolean noLocal)
             throws AMQException, QpidSecurityException, MessageSource.ExistingConsumerPreventsExclusive,
-                   MessageSource.ExistingExclusiveConsumer, AMQInvalidArgumentException
+                   MessageSource.ExistingExclusiveConsumer, AMQInvalidArgumentException,
+                   MessageSource.ConsumerAccessRefused
     {
         if (tag == null)
         {
@@ -580,7 +589,15 @@
                 {
                     filterManager = new SimpleFilterManager();
                 }
-                filterManager.add(new FilterSupport.NoLocalFilter(source));
+                final Object connectionReference = getConnectionReference();
+                filterManager.add(new MessageFilter()
+                {
+                    @Override
+                    public boolean matches(final Filterable message)
+                    {
+                        return message.getConnectionReference() != connectionReference;
+                    }
+                });
             }
             Consumer sub =
                     source.addConsumer(target,
@@ -609,6 +626,11 @@
             _tag2SubscriptionTargetMap.remove(tag);
             throw e;
         }
+        catch (MessageSource.ConsumerAccessRefused e)
+        {
+            _tag2SubscriptionTargetMap.remove(tag);
+            throw e;
+        }
         return tag;
     }
 
@@ -657,6 +679,13 @@
         CurrentActor.get().message(_logSubject, operationalLogMessage);
 
         unsubscribeAllConsumers();
+
+        for (Action<? super AMQChannel<T>> task : _taskList)
+        {
+            task.performAction(this);
+        }
+
+
         _transaction.rollback();
 
         try
@@ -692,9 +721,10 @@
 
             Consumer sub = me.getValue().getConsumer();
 
-
-            sub.close();
-
+            if(sub != null)
+            {
+                sub.close();
+            }
         }
 
         _tag2SubscriptionTargetMap.clear();
@@ -1192,7 +1222,8 @@
         return _id;
     }
 
-    public AMQConnectionModel getConnectionModel()
+    @Override
+    public T getConnectionModel()
     {
         return _session;
     }
@@ -1208,11 +1239,23 @@
     }
 
     @Override
-    public int compareTo(AMQSessionModel o)
+    public int compareTo(AMQChannel o)
     {
         return getId().compareTo(o.getId());
     }
 
+    @Override
+    public void addDeleteTask(final Action<? super AMQChannel<T>> task)
+    {
+        _taskList.add(task);
+    }
+
+    @Override
+    public void removeDeleteTask(final Action<? super AMQChannel<T>> task)
+    {
+        _taskList.remove(task);
+    }
+
 
     private class ImmediateAction<C extends Consumer> implements Action<MessageInstance<?,C>>
     {
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index 5e95701..68f1ad7 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -33,7 +33,6 @@
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
@@ -47,18 +46,15 @@
 import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.*;
-import org.apache.qpid.server.security.QpidSecurityException;
 import org.apache.qpid.codec.AMQCodecFactory;
 import org.apache.qpid.common.QpidProperties;
 import org.apache.qpid.common.ServerPropertyNames;
 import org.apache.qpid.properties.ConnectionStartProperties;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.protocol.AMQMethodListener;
 import org.apache.qpid.protocol.ServerProtocolEngine;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.configuration.BrokerProperties;
 import org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl;
 import org.apache.qpid.server.logging.LogActor;
@@ -78,6 +74,7 @@
 import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
 import org.apache.qpid.server.stats.StatisticsCounter;
 import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
 import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -86,7 +83,7 @@
 import org.apache.qpid.transport.network.NetworkConnection;
 import org.apache.qpid.util.BytesDataOutput;
 
-public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSession
+public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSession<AMQProtocolEngine>
 {
     private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class);
 
@@ -103,9 +100,11 @@
 
     private VirtualHost _virtualHost;
 
-    private final Map<Integer, AMQChannel> _channelMap = new HashMap<Integer, AMQChannel>();
+    private final Map<Integer, AMQChannel<AMQProtocolEngine>> _channelMap =
+            new HashMap<Integer, AMQChannel<AMQProtocolEngine>>();
 
-    private final AMQChannel[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1];
+    @SuppressWarnings("unchecked")
+    private final AMQChannel<AMQProtocolEngine>[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1];
 
     /**
      * The channels that the latest call to {@link #received(ByteBuffer)} applied to.
@@ -114,9 +113,8 @@
      *
      * Thread-safety: guarded by {@link #_receivedLock}.
      */
-    private final Set<AMQChannel> _channelsForCurrentMessage = new HashSet<AMQChannel>();
-
-    private final CopyOnWriteArraySet<AMQMethodListener> _frameListeners = new CopyOnWriteArraySet<AMQMethodListener>();
+    private final Set<AMQChannel<AMQProtocolEngine>> _channelsForCurrentMessage =
+            new HashSet<AMQChannel<AMQProtocolEngine>>();
 
     private final AMQStateManager _stateManager;
 
@@ -124,10 +122,6 @@
 
     private SaslServer _saslServer;
 
-    private Object _lastReceived;
-
-    private Object _lastSent;
-
     private volatile boolean _closed;
 
     // maximum number of channels this session should have
@@ -136,8 +130,8 @@
     /* AMQP Version for this session */
     private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion();
     private MethodRegistry _methodRegistry = MethodRegistry.getMethodRegistry(_protocolVersion);
-    private FieldTable _clientProperties;
-    private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
+    private final List<Action<? super AMQProtocolEngine>> _taskList =
+            new CopyOnWriteArrayList<Action<? super AMQProtocolEngine>>();
 
     private Map<Integer, Long> _closingChannelsList = new ConcurrentHashMap<Integer, Long>();
     private ProtocolOutputConverter _protocolOutputConverter;
@@ -153,12 +147,9 @@
     private long _lastIoTime;
 
     private long _writtenBytes;
-    private long _readBytes;
-
 
     private long _maxFrameSize;
     private final AtomicBoolean _closing = new AtomicBoolean(false);
-    private long _createTime = System.currentTimeMillis();
 
     private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
 
@@ -176,6 +167,7 @@
 
     private volatile boolean _closeWhenNoRoute;
     private volatile boolean _stopped;
+    private long _readBytes;
 
     public AMQProtocolEngine(Broker broker,
                              NetworkConnection network,
@@ -258,15 +250,14 @@
         final long arrivalTime = System.currentTimeMillis();
         _lastReceivedTime = arrivalTime;
         _lastIoTime = arrivalTime;
+        _readBytes += msg.remaining();
 
         _receivedLock.lock();
         try
         {
             final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
-            final int len = dataBlocks.size();
-            for (int i = 0; i < len; i++)
+            for (AMQDataBlock dataBlock : dataBlocks)
             {
-                AMQDataBlock dataBlock = dataBlocks.get(i);
                 try
                 {
                     dataBlockReceived(dataBlock);
@@ -316,7 +307,7 @@
 
     private void receivedComplete()
     {
-        for (AMQChannel channel : _channelsForCurrentMessage)
+        for (AMQChannel<AMQProtocolEngine> channel : _channelsForCurrentMessage)
         {
             channel.receivedComplete();
         }
@@ -334,7 +325,6 @@
      */
     private void dataBlockReceived(AMQDataBlock message) throws Exception
     {
-        _lastReceived = message;
         if (message instanceof ProtocolInitiation)
         {
             protocolInitiationReceived((ProtocolInitiation) message);
@@ -363,7 +353,7 @@
     private void frameReceived(AMQFrame frame) throws AMQException
     {
         int channelId = frame.getChannel();
-        AMQChannel amqChannel = _channelMap.get(channelId);
+        AMQChannel<AMQProtocolEngine> amqChannel = _channelMap.get(channelId);
         if(amqChannel != null)
         {
             // The _receivedLock is already acquired in the caller
@@ -558,14 +548,6 @@
             {
                 boolean wasAnyoneInterested = _stateManager.methodReceived(evt);
 
-                if (!_frameListeners.isEmpty())
-                {
-                    for (AMQMethodListener listener : _frameListeners)
-                    {
-                        wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
-                    }
-                }
-
                 if (!wasAnyoneInterested)
                 {
                     throw new AMQNoMethodHandlerException(evt);
@@ -611,11 +593,6 @@
         }
         catch (Exception e)
         {
-            for (AMQMethodListener listener : _frameListeners)
-            {
-                listener.error(e);
-            }
-
             _logger.error("Unexpected exception while processing frame.  Closing connection.", e);
 
             closeProtocolSession();
@@ -625,7 +602,7 @@
     public void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException
     {
 
-        AMQChannel channel = getAndAssertChannel(channelId);
+        AMQChannel<AMQProtocolEngine> channel = getAndAssertChannel(channelId);
 
         channel.publishContentHeader(body);
 
@@ -633,7 +610,7 @@
 
     public void contentBodyReceived(int channelId, ContentBody body) throws AMQException
     {
-        AMQChannel channel = getAndAssertChannel(channelId);
+        AMQChannel<AMQProtocolEngine> channel = getAndAssertChannel(channelId);
 
         channel.publishContentBody(body);
     }
@@ -681,17 +658,17 @@
         _contextKey = contextKey;
     }
 
-    public List<AMQChannel> getChannels()
+    public List<AMQChannel<AMQProtocolEngine>> getChannels()
     {
         synchronized (_channelMap)
         {
-            return new ArrayList<AMQChannel>(_channelMap.values());
+            return new ArrayList<AMQChannel<AMQProtocolEngine>>(_channelMap.values());
         }
     }
 
-    public AMQChannel getAndAssertChannel(int channelId) throws AMQException
+    public AMQChannel<AMQProtocolEngine> getAndAssertChannel(int channelId) throws AMQException
     {
-        AMQChannel channel = getChannel(channelId);
+        AMQChannel<AMQProtocolEngine> channel = getChannel(channelId);
         if (channel == null)
         {
             throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + channelId);
@@ -700,9 +677,9 @@
         return channel;
     }
 
-    public AMQChannel getChannel(int channelId)
+    public AMQChannel<AMQProtocolEngine> getChannel(int channelId)
     {
-        final AMQChannel channel =
+        final AMQChannel<AMQProtocolEngine> channel =
                 ((channelId & CHANNEL_CACHE_SIZE) == channelId) ? _cachedChannels[channelId] : _channelMap.get(channelId);
         if ((channel == null) || channel.isClosing())
         {
@@ -719,7 +696,7 @@
         return !_closingChannelsList.isEmpty() && _closingChannelsList.containsKey(channelId);
     }
 
-    public void addChannel(AMQChannel channel) throws AMQException
+    public void addChannel(AMQChannel<AMQProtocolEngine> channel) throws AMQException
     {
         if (_closed)
         {
@@ -770,7 +747,7 @@
         _maxNoOfChannels = value;
     }
 
-    public void commitTransactions(AMQChannel channel) throws AMQException
+    public void commitTransactions(AMQChannel<AMQProtocolEngine> channel) throws AMQException
     {
         if ((channel != null) && channel.isTransactional())
         {
@@ -778,7 +755,7 @@
         }
     }
 
-    public void rollbackTransactions(AMQChannel channel) throws AMQException
+    public void rollbackTransactions(AMQChannel<AMQProtocolEngine> channel) throws AMQException
     {
         if ((channel != null) && channel.isTransactional())
         {
@@ -802,7 +779,7 @@
 
     public void closeChannel(int channelId, AMQConstant cause, String message)
     {
-        final AMQChannel channel = getChannel(channelId);
+        final AMQChannel<AMQProtocolEngine> channel = getChannel(channelId);
         if (channel == null)
         {
             throw new IllegalArgumentException("Unknown channel id");
@@ -879,12 +856,10 @@
 
     /**
      * Closes all channels that were opened by this protocol session. This frees up all resources used by the channel.
-     *
-     * @throws AMQException if an error occurs while closing any channel
      */
     private void closeAllChannels()
     {
-        for (AMQChannel channel : getChannels())
+        for (AMQChannel<AMQProtocolEngine> channel : getChannels())
         {
             channel.close();
         }
@@ -929,9 +904,9 @@
 
                 closeAllChannels();
 
-                for (Task task : _taskList)
+                for (Action<? super AMQProtocolEngine> task : _taskList)
                 {
-                    task.doTask(this);
+                    task.performAction(this);
                 }
 
                 synchronized(this)
@@ -961,7 +936,7 @@
                     }
                     catch (InterruptedException e)
                     {
-
+                        // do nothing
                     }
                     finally
                     {
@@ -1027,11 +1002,6 @@
         return getRemoteAddress() + "(" + (getAuthorizedPrincipal() == null ? "?" : getAuthorizedPrincipal().getName() + ")");
     }
 
-    public String dump()
-    {
-        return this + " last_sent=" + _lastSent + " last_received=" + _lastReceived;
-    }
-
     /** @return an object that can be used to identity */
     public Object getKey()
     {
@@ -1069,10 +1039,9 @@
 
     public void setClientProperties(FieldTable clientProperties)
     {
-        _clientProperties = clientProperties;
-        if (_clientProperties != null)
+        if (clientProperties != null)
         {
-            String closeWhenNoRoute = _clientProperties.getString(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE);
+            String closeWhenNoRoute = clientProperties.getString(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE);
             if (closeWhenNoRoute != null)
             {
                 _closeWhenNoRoute = Boolean.parseBoolean(closeWhenNoRoute);
@@ -1082,10 +1051,10 @@
                 }
             }
 
-            _clientVersion = _clientProperties.getString(ConnectionStartProperties.VERSION_0_8);
-            _clientProduct = _clientProperties.getString(ConnectionStartProperties.PRODUCT);
+            _clientVersion = clientProperties.getString(ConnectionStartProperties.VERSION_0_8);
+            _clientProduct = clientProperties.getString(ConnectionStartProperties.PRODUCT);
 
-            String clientId = _clientProperties.getString(ConnectionStartProperties.CLIENT_ID_0_8);
+            String clientId = clientProperties.getString(ConnectionStartProperties.CLIENT_ID_0_8);
             if (clientId != null)
             {
                 setContextKey(new AMQShortString(clientId));
@@ -1118,11 +1087,6 @@
         return _protocolVersion.getMinorVersion();
     }
 
-    public boolean isProtocolVersion(byte major, byte minor)
-    {
-        return (getProtocolMajorVersion() == major) && (getProtocolMinorVersion() == minor);
-    }
-
     public MethodRegistry getRegistry()
     {
         return getMethodRegistry();
@@ -1141,12 +1105,12 @@
 
     }
 
-    public void addSessionCloseTask(Task task)
+    public void addDeleteTask(Action<? super AMQProtocolEngine> task)
     {
         _taskList.add(task);
     }
 
-    public void removeSessionCloseTask(Task task)
+    public void removeDeleteTask(Action<? super AMQProtocolEngine> task)
     {
         _taskList.remove(task);
     }
@@ -1341,51 +1305,11 @@
         return _clientProduct;
     }
 
-    public String getPrincipalAsString()
-    {
-        return getAuthId();
-    }
-
     public long getSessionCountLimit()
     {
         return getMaximumNumberOfChannels();
     }
 
-    public Boolean isIncoming()
-    {
-        return true;
-    }
-
-    public Boolean isSystemConnection()
-    {
-        return false;
-    }
-
-    public Boolean isFederationLink()
-    {
-        return false;
-    }
-
-    public String getAuthId()
-    {
-        return getAuthorizedPrincipal() == null ? null : getAuthorizedPrincipal().getName();
-    }
-
-    public Integer getRemotePID()
-    {
-        return null;
-    }
-
-    public String getRemoteProcessName()
-    {
-        return null;
-    }
-
-    public Integer getRemoteParentPID()
-    {
-        return null;
-    }
-
     public boolean isDurable()
     {
         return false;
@@ -1401,52 +1325,6 @@
         return String.valueOf(getRemoteAddress());
     }
 
-    public long getCreateTime()
-    {
-        return _createTime;
-    }
-
-    public Boolean isShadow()
-    {
-        return false;
-    }
-
-    public void mgmtClose()
-    {
-        MethodRegistry methodRegistry = getMethodRegistry();
-        ConnectionCloseBody responseBody =
-                methodRegistry.createConnectionCloseBody(
-                        AMQConstant.REPLY_SUCCESS.getCode(),
-                        new AMQShortString("The connection was closed using the broker's management interface."),
-                        0,0);
-
-        // This seems ugly but because we use closeConnection in both normal
-        // broker operation and as part of the management interface it cannot
-        // be avoided. The Current Actor will be null when this method is
-        // called via the QMF management interface. As such we need to set one.
-        boolean removeActor = false;
-        if (CurrentActor.get() == null)
-        {
-            removeActor = true;
-            CurrentActor.set(new ManagementActor(_actor.getRootMessageLogger()));
-        }
-
-        try
-        {
-            writeFrame(responseBody.generateFrame(0));
-
-            closeSession();
-
-        }
-        finally
-        {
-            if (removeActor)
-            {
-                CurrentActor.remove();
-            }
-        }
-    }
-
     public void mgmtCloseChannel(int channelId)
     {
         MethodRegistry methodRegistry = getMethodRegistry();
@@ -1481,14 +1359,9 @@
         }
     }
 
-    public String getClientID()
+    public void closeSession(AMQChannel<AMQProtocolEngine> session, AMQConstant cause, String message)
     {
-        return getContextKey().toString();
-    }
-
-    public void closeSession(AMQSessionModel session, AMQConstant cause, String message)
-    {
-        int channelId = ((AMQChannel)session).getChannelId();
+        int channelId = session.getChannelId();
         closeChannel(channelId, cause, message);
 
         MethodRegistry methodRegistry = getMethodRegistry();
@@ -1506,7 +1379,7 @@
         closeConnection(0, new AMQConnectionException(cause, message, 0, 0,
 		                getProtocolOutputConverter().getProtocolMajorVersion(),
 		                getProtocolOutputConverter().getProtocolMinorVersion(),
-		                (Throwable) null));
+		                null));
     }
 
     public void block()
@@ -1516,7 +1389,7 @@
             if(!_blocking)
             {
                 _blocking = true;
-                for(AMQChannel channel : _channelMap.values())
+                for(AMQChannel<AMQProtocolEngine> channel : _channelMap.values())
                 {
                     channel.block();
                 }
@@ -1531,7 +1404,7 @@
             if(_blocking)
             {
                 _blocking = false;
-                for(AMQChannel channel : _channelMap.values())
+                for(AMQChannel<AMQProtocolEngine> channel : _channelMap.values())
                 {
                     channel.unblock();
                 }
@@ -1544,9 +1417,9 @@
         return _closed;
     }
 
-    public List<AMQSessionModel> getSessionModels()
+    public List<AMQChannel<AMQProtocolEngine>> getSessionModels()
     {
-		return new ArrayList<AMQSessionModel>(getChannels());
+		return new ArrayList<AMQChannel<AMQProtocolEngine>>(getChannels());
     }
 
     public LogSubject getLogSubject()
@@ -1620,16 +1493,17 @@
         return String.valueOf(getContextKey());
     }
 
+    @Override
+    public String getRemoteContainerName()
+    {
+        return String.valueOf(getContextKey());
+    }
+
     public void setDeferFlush(boolean deferFlush)
     {
         _deferFlush = deferFlush;
     }
 
-    public String getUserName()
-    {
-        return getAuthorizedPrincipal().getName();
-    }
-
     public final class WriteDeliverMethod
             implements ClientDeliveryMethod
     {
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java
index 58a3b5d..045367b 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java
@@ -37,12 +37,14 @@
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
 import org.apache.qpid.server.security.AuthorizationHolder;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 
-public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, AuthorizationHolder, AMQConnectionModel
+public interface AMQProtocolSession<T extends AMQProtocolSession<T>>
+        extends AMQVersionAwareProtocolSession, AuthorizationHolder, AMQConnectionModel<T,AMQChannel<T>>
 {
     long getSessionID();
 
@@ -69,11 +71,6 @@
      */
     SocketAddress getLocalAddress();
 
-    public static interface Task
-    {
-        public void doTask(AMQProtocolSession session);
-    }
-
     /**
      * Get the context key associated with this session. Context key is described in the AMQ protocol specification (RFC
      * 6).
@@ -98,7 +95,7 @@
      *
      * @return null if no channel exists, the channel otherwise
      */
-    AMQChannel getChannel(int channelId);
+    AMQChannel<T> getChannel(int channelId);
 
     /**
      * Associate a channel with this session.
@@ -106,7 +103,7 @@
      * @param channel the channel to associate with this session. It is an error to associate the same channel with more
      *                than one session but this is not validated.
      */
-    void addChannel(AMQChannel channel) throws AMQException;
+    void addChannel(AMQChannel<T> channel) throws AMQException;
 
     /**
      * Close a specific channel. This will remove any resources used by the channel, including: <ul><li>any queue
@@ -185,10 +182,6 @@
 
     void setVirtualHost(VirtualHost virtualHost) throws AMQException;
 
-    void addSessionCloseTask(Task task);
-
-    void removeSessionCloseTask(Task task);
-
     public ProtocolOutputConverter getProtocolOutputConverter();
 
     void setAuthorizedSubject(Subject authorizedSubject);
@@ -209,11 +202,11 @@
 
     void setMaximumNumberOfChannels(Long value);
 
-    void commitTransactions(AMQChannel channel) throws AMQException;
+    void commitTransactions(AMQChannel<T> channel) throws AMQException;
 
-    void rollbackTransactions(AMQChannel channel) throws AMQException;
+    void rollbackTransactions(AMQChannel<T> channel) throws AMQException;
 
-    List<AMQChannel> getChannels();
+    List<AMQChannel<T>> getChannels();
 
     void mgmtCloseChannel(int channelId);
 
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
index ce90de7..c93c164 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
@@ -99,16 +99,6 @@
             {
                 final AMQShortString consumerTagName;
 
-                if (queue.isExclusive() && !queue.isDurable())
-                {
-                    AMQSessionModel session = queue.getExclusiveOwningSession();
-                    if (session == null || session.getConnectionModel() != protocolConnection)
-                    {
-                        throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
-                                                          "Queue " + queue.getName() + " is exclusive, but not created on this Connection.");
-                    }
-                }
-
                 if (body.getConsumerTag() != null)
                 {
                     consumerTagName = body.getConsumerTag().intern(false);
@@ -184,6 +174,13 @@
                                                    + queue.getName()
                                                    + " permission denied");
                 }
+                catch (MessageSource.ConsumerAccessRefused consumerAccessRefused)
+                {
+                    throw body.getChannelException(AMQConstant.ACCESS_REFUSED,
+                                                   "Cannot subscribe to queue "
+                                                   + queue.getName()
+                                                   + " as it already has an incompatible exclusivity policy");
+                }
 
             }
         }
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
index c9a7cc6..4370004 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
@@ -98,15 +98,6 @@
             }
             else
             {
-                if (queue.isExclusive())
-                {
-                    AMQSessionModel session = queue.getExclusiveOwningSession();
-                    if (session == null || session.getConnectionModel() != protocolConnection)
-                    {
-                        throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
-                                                          "Queue is exclusive, but not created on this Connection.");
-                    }
-                }
 
                 try
                 {
@@ -136,6 +127,11 @@
                                                       "The GET request has been evaluated as an exclusive consumer, " +
                                                       "this is likely due to a programming error in the Qpid broker");
                 }
+                catch (MessageSource.ConsumerAccessRefused consumerAccessRefused)
+                {
+                    throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+                                                      "Queue has an incompatible exclusivit policy");
+                }
             }
         }
     }
@@ -145,7 +141,7 @@
                                      final AMQChannel channel,
                                      final boolean acks)
             throws AMQException, QpidSecurityException, MessageSource.ExistingConsumerPreventsExclusive,
-                   MessageSource.ExistingExclusiveConsumer
+                   MessageSource.ExistingExclusiveConsumer, MessageSource.ConsumerAccessRefused
     {
 
         final FlowCreditManager singleMessageCredit = new MessageOnlyCreditManager(1L);
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java
index 401718d..9b875cc 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java
@@ -116,15 +116,6 @@
 
         try
         {
-            if (queue.isExclusive() && !queue.isDurable())
-            {
-                AMQSessionModel session = queue.getExclusiveOwningSession();
-                if (session == null || session.getConnectionModel() != protocolConnection)
-                {
-                    throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
-                                                      "Queue " + queue.getName() + " is exclusive, but not created on this Connection.");
-                }
-            }
 
             Map<String,Object> arguments = FieldTable.convertToMap(body.getArguments());
             String bindingKey = String.valueOf(routingKey);
@@ -144,10 +135,6 @@
                 }
             }
         }
-        catch (AMQException e)
-        {
-            throw body.getChannelException(AMQConstant.CHANNEL_ERROR, e.toString());
-        }
         catch (QpidSecurityException e)
         {
             throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage());
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
index 5b55256..215e3f2 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
@@ -29,6 +29,9 @@
 import org.apache.qpid.framing.QueueDeclareBody;
 import org.apache.qpid.framing.QueueDeclareOkBody;
 import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.model.ExclusivityPolicy;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.protocol.v0_8.AMQChannel;
 import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
@@ -39,7 +42,6 @@
 import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
 import org.apache.qpid.server.security.QpidSecurityException;
 import org.apache.qpid.server.util.Action;
-import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.Map;
@@ -96,9 +98,7 @@
             }
             else
             {
-                AMQSessionModel owningSession = queue.getExclusiveOwningSession();
-                if (queue.isExclusive() && !queue.isDurable()
-                    && (owningSession == null || owningSession.getConnectionModel() != protocolConnection))
+                if (!queue.verifySessionAccess(channel))
                 {
                     throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
                                                       "Queue " + queue.getName() + " is exclusive, but not created on this Connection.");
@@ -114,42 +114,15 @@
             try
             {
 
-                queue = createQueue(queueName, body, virtualHost, protocolConnection);
-                queue.setAuthorizationHolder(protocolConnection);
-
-                if (body.getExclusive())
-                {
-                    queue.setExclusiveOwningSession(protocolConnection.getChannel(channelId));
-                    queue.setAuthorizationHolder(protocolConnection);
-
-                    if(!body.getDurable())
-                    {
-                        final AMQQueue q = queue;
-                        final AMQProtocolSession.Task sessionCloseTask = new AMQProtocolSession.Task()
-                        {
-                            public void doTask(AMQProtocolSession session)
-                            {
-                                q.setExclusiveOwningSession(null);
-                            }
-                        };
-                        protocolConnection.addSessionCloseTask(sessionCloseTask);
-                        queue.addQueueDeleteTask(new Action<AMQQueue>() {
-                            public void performAction(AMQQueue queue)
-                            {
-                                protocolConnection.removeSessionCloseTask(sessionCloseTask);
-                            }
-                        });
-                    }
-                }
+                queue = createQueue(channel, queueName, body, virtualHost, protocolConnection);
 
             }
             catch(QueueExistsException qe)
             {
 
                 queue = qe.getExistingQueue();
-                AMQSessionModel owningSession = queue.getExclusiveOwningSession();
 
-                if (queue.isExclusive() && !queue.isDurable() && (owningSession == null || owningSession.getConnectionModel() != protocolConnection))
+                if (!queue.verifySessionAccess(channel))
                 {
                     throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
                                                       "Queue " + queue.getName() + " is exclusive, but not created on this Connection.");
@@ -161,19 +134,12 @@
                             "Cannot re-declare queue '" + queue.getName() + "' with different exclusivity (was: "
                             + queue.isExclusive() + " requested " + body.getExclusive() + ")");
                 }
-                else if (body.getExclusive() && !(queue.isDurable() ? String.valueOf(queue.getOwner()).equals(session.getClientID()) : (owningSession == null || owningSession.getConnectionModel() == protocolConnection)))
-                {
-                    throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue('" + queueName + "'), "
-                                                                               + "as exclusive queue with same name "
-                                                                               + "declared on another client ID('"
-                                                                               + queue.getOwner() + "') your clientID('" + session.getClientID() + "')");
-
-                }
-                else if(queue.isAutoDelete() != body.getAutoDelete())
+                else if((body.getAutoDelete() && queue.getLifetimePolicy() != LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS)
+                    || (!body.getAutoDelete() && queue.getLifetimePolicy() != ((body.getExclusive() && !body.getDurable()) ? LifetimePolicy.DELETE_ON_CONNECTION_CLOSE : LifetimePolicy.PERMANENT)))
                 {
                     throw body.getChannelException(AMQConstant.ALREADY_EXISTS,
-                                                      "Cannot re-declare queue '" + queue.getName() + "' with different auto-delete (was: "
-                                                        + queue.isAutoDelete() + " requested " + body.getAutoDelete() + ")");
+                                                      "Cannot re-declare queue '" + queue.getName() + "' with different lifetime policy (was: "
+                                                        + queue.getLifetimePolicy() + " requested autodelete: " + body.getAutoDelete() + ")");
                 }
                 else if(queue.isDurable() != body.getDurable())
                 {
@@ -211,7 +177,7 @@
         return new AMQShortString("tmp_" + UUID.randomUUID());
     }
 
-    protected AMQQueue createQueue(final AMQShortString queueName,
+    protected AMQQueue createQueue(final AMQChannel channel, final AMQShortString queueName,
                                    QueueDeclareBody body,
                                    final VirtualHost virtualHost,
                                    final AMQProtocolSession session)
@@ -222,47 +188,35 @@
         final boolean autoDelete = body.getAutoDelete();
         final boolean exclusive = body.getExclusive();
 
-        String owner = exclusive ? AMQShortString.toString(session.getContextKey()) : null;
 
-        Map<String, Object> arguments =
+        Map<String, Object> attributes =
                 QueueArgumentsConverter.convertWireArgsToModel(FieldTable.convertToMap(body.getArguments()));
-        String queueNameString = AMQShortString.toString(queueName);
-        final UUID id = UUIDGenerator.generateQueueUUID(queueNameString, virtualHost.getName());
+        final String queueNameString = AMQShortString.toString(queueName);
+        attributes.put(Queue.NAME, queueNameString);
+        attributes.put(Queue.ID, UUIDGenerator.generateQueueUUID(queueNameString, virtualHost.getName()));
+        attributes.put(Queue.DURABLE, durable);
 
-        final AMQQueue queue = virtualHost.createQueue(id, queueNameString, durable, owner, autoDelete,
-                exclusive, autoDelete, arguments);
+        LifetimePolicy lifetimePolicy;
+        ExclusivityPolicy exclusivityPolicy;
 
-        if (exclusive && !durable)
+        if(exclusive)
         {
-            final AMQProtocolSession.Task deleteQueueTask =
-                    new AMQProtocolSession.Task()
-                    {
-                        public void doTask(AMQProtocolSession session)
-                        {
-                            if (virtualHost.getQueue(queueName.toString()) == queue)
-                            {
-                                try
-                                {
-                                    virtualHost.removeQueue(queue);
-                                }
-                                catch (QpidSecurityException e)
-                                {
-                                    throw new ConnectionScopedRuntimeException("Permission exception: Unable to remove a temporary queue created by a session which has now removed itself", e);
-                                }
-                            }
-                        }
-                    };
-
-            session.addSessionCloseTask(deleteQueueTask);
-
-            queue.addQueueDeleteTask(new Action<AMQQueue>()
-            {
-                public void performAction(AMQQueue queue)
-                {
-                    session.removeSessionCloseTask(deleteQueueTask);
-                }
-            });
+            lifetimePolicy = autoDelete
+                    ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS
+                    : durable ? LifetimePolicy.PERMANENT : LifetimePolicy.DELETE_ON_CONNECTION_CLOSE;
+            exclusivityPolicy = durable ? ExclusivityPolicy.CONTAINER : ExclusivityPolicy.CONNECTION;
         }
+        else
+        {
+            lifetimePolicy = autoDelete ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS : LifetimePolicy.PERMANENT;
+            exclusivityPolicy = ExclusivityPolicy.NONE;
+        }
+
+        attributes.put(Queue.EXCLUSIVE, exclusivityPolicy);
+        attributes.put(Queue.LIFETIME_POLICY, lifetimePolicy);
+
+
+        final AMQQueue queue = virtualHost.createQueue(channel, attributes);
 
         return queue;
     }
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java
index 3a9a6dc..c939e49 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java
@@ -105,8 +105,7 @@
             }
             else
             {
-                AMQSessionModel session = queue.getExclusiveOwningSession();
-                if (queue.isExclusive() && !queue.isDurable() && (session == null || session.getConnectionModel() != protocolConnection))
+                if (!queue.verifySessionAccess(channel))
                 {
                     throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
                                                       "Queue " + queue.getName() + " is exclusive, but not created on this Connection.");
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java
index 6d8f8e6..9d035a3 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java
@@ -96,9 +96,7 @@
         }
         else
         {
-                AMQSessionModel session = queue.getExclusiveOwningSession();
-
-                if (queue.isExclusive() && (session == null || session.getConnectionModel() != protocolConnection))
+                if (!queue.verifySessionAccess(channel))
                 {
                     throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
                                                       "Queue is exclusive, but not created on this Connection.");
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
index 7661d98..f8888f9 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
@@ -223,7 +223,7 @@
         //  Then the AMQMinaProtocolSession can join on the returning future without a NPE.
     }
 
-    public void closeSession(AMQSessionModel session, AMQConstant cause, String message)
+    public void closeSession(AMQChannel session, AMQConstant cause, String message)
     {
         super.closeSession(session, cause, message);
 
diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
index cae61f9..7f8237c 100644
--- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
+++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.protocol.v1_0;
 
+import java.security.Principal;
 import java.text.MessageFormat;
 import java.util.Collection;
 import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
@@ -42,7 +43,7 @@
 
 import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT;
 
-public class Connection_1_0 implements ConnectionEventListener
+public class Connection_1_0 implements ConnectionEventListener, AMQConnectionModel<Connection_1_0,Session_1_0>
 {
 
     private final Port _port;
@@ -53,8 +54,33 @@
     private final Collection<Session_1_0> _sessions = Collections.synchronizedCollection(new ArrayList<Session_1_0>());
     private final Object _reference = new Object();
 
-    private List<Action<Connection_1_0>> _closeTasks =
-            Collections.synchronizedList(new ArrayList<Action<Connection_1_0>>());
+
+    private StatisticsCounter _messageDeliveryStatistics = new StatisticsCounter();
+    private StatisticsCounter _messageReceiptStatistics = new StatisticsCounter();
+    private StatisticsCounter _dataDeliveryStatistics = new StatisticsCounter();
+    private StatisticsCounter _dataReceiptStatistics = new StatisticsCounter();
+
+    private final LogSubject _logSubject = new LogSubject()
+    {
+        @Override
+        public String toLogString()
+        {
+            return "[" +
+                   MessageFormat.format(CONNECTION_FORMAT,
+                                        getConnectionId(),
+                                        getClientId(),
+                                        getRemoteAddressString(),
+                                        _vhost.getName())
+                   + "] ";
+
+        }
+    };
+
+    private volatile boolean _stopped;
+
+
+    private List<Action<? super Connection_1_0>> _closeTasks =
+            Collections.synchronizedList(new ArrayList<Action<? super Connection_1_0>>());
 
 
 
@@ -69,7 +95,7 @@
         _transport = transport;
         _conn = conn;
         _connectionId = connectionId;
-        _vhost.getConnectionRegistry().registerConnection(_model);
+        _vhost.getConnectionRegistry().registerConnection(this);
 
     }
 
@@ -80,7 +106,7 @@
 
     public void remoteSessionCreation(SessionEndpoint endpoint)
     {
-        Session_1_0 session = new Session_1_0(_vhost, this);
+        Session_1_0 session = new Session_1_0(_vhost, this, endpoint);
         _sessions.add(session);
         endpoint.setSessionEventListener(session);
     }
@@ -90,24 +116,24 @@
         _sessions.remove(session);
     }
 
-    void removeConnectionCloseTask(final Action<Connection_1_0> task)
+    public void removeDeleteTask(final Action<? super Connection_1_0> task)
     {
         _closeTasks.remove( task );
     }
 
-    void addConnectionCloseTask(final Action<Connection_1_0> task)
+    public void addDeleteTask(final Action<? super Connection_1_0> task)
     {
         _closeTasks.add( task );
     }
 
     public void closeReceived()
     {
-        List<Action<Connection_1_0>> taskCopy;
+        List<Action<? super Connection_1_0>> taskCopy;
         synchronized (_closeTasks)
         {
-            taskCopy = new ArrayList<Action<Connection_1_0>>(_closeTasks);
+            taskCopy = new ArrayList<Action<? super Connection_1_0>>(_closeTasks);
         }
-        for(Action<Connection_1_0> task : taskCopy)
+        for(Action<? super Connection_1_0> task : taskCopy)
         {
             task.performAction(this);
         }
@@ -115,7 +141,7 @@
         {
             _closeTasks.clear();
         }
-        _vhost.getConnectionRegistry().deregisterConnection(_model);
+        _vhost.getConnectionRegistry().deregisterConnection(this);
 
 
     }
@@ -125,30 +151,6 @@
         closeReceived();
     }
 
-    private final AMQConnectionModel _model = new AMQConnectionModel()
-    {
-        private StatisticsCounter _messageDeliveryStatistics = new StatisticsCounter();
-        private StatisticsCounter _messageReceiptStatistics = new StatisticsCounter();
-        private StatisticsCounter _dataDeliveryStatistics = new StatisticsCounter();
-        private StatisticsCounter _dataReceiptStatistics = new StatisticsCounter();
-
-        private final LogSubject _logSubject = new LogSubject()
-        {
-            @Override
-            public String toLogString()
-            {
-                return "[" +
-                        MessageFormat.format(CONNECTION_FORMAT,
-                                             getConnectionId(),
-                                             getClientId(),
-                                             getRemoteAddressString(),
-                                             _vhost.getName())
-                     + "] ";
-
-            }
-        };
-
-        private volatile boolean _stopped;
 
         @Override
         public void close(AMQConstant cause, String message)
@@ -169,9 +171,9 @@
         }
 
         @Override
-        public void closeSession(AMQSessionModel session, AMQConstant cause, String message)
+        public void closeSession(Session_1_0 session, AMQConstant cause, String message)
         {
-            // TODO
+            session.close(cause, message);
         }
 
         @Override
@@ -181,9 +183,9 @@
         }
 
         @Override
-        public List<AMQSessionModel> getSessionModels()
+        public List<Session_1_0> getSessionModels()
         {
-            return new ArrayList<AMQSessionModel>(_sessions);
+            return new ArrayList<Session_1_0>(_sessions);
         }
 
         @Override
@@ -193,12 +195,6 @@
         }
 
         @Override
-        public String getUserName()
-        {
-            return getPrincipalAsString();
-        }
-
-        @Override
         public boolean isSessionNameUnique(byte[] name)
         {
             return true;  // TODO
@@ -216,7 +212,13 @@
             return _conn.getRemoteContainerId();
         }
 
-        @Override
+    @Override
+    public String getRemoteContainerName()
+    {
+        return _conn.getRemoteContainerId();
+    }
+
+    @Override
         public String getClientVersion()
         {
             return "";  //TODO
@@ -228,10 +230,9 @@
             return "";  //TODO
         }
 
-        @Override
-        public String getPrincipalAsString()
+        public Principal getAuthorizedPrincipal()
         {
-            return String.valueOf(_conn.getUser());
+            return _conn.getUser();
         }
 
         @Override
@@ -337,11 +338,10 @@
         }
 
 
-    };
 
     AMQConnectionModel getModel()
     {
-        return _model;
+        return this;
     }
 
 
diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
index 546cc79..f7e2d2d 100644
--- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
+++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
@@ -30,6 +30,9 @@
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.log4j.Logger;
+import org.apache.qpid.server.model.ExclusivityPolicy;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.security.QpidSecurityException;
 import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
 import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
@@ -207,15 +210,14 @@
 
                 if(queue == null)
                 {
-                    queue = _vhost.createQueue(
-                                UUIDGenerator.generateQueueUUID(name, _vhost.getName()),
-                                name,
-                                isDurable,
-                                null,
-                                true,
-                                true,
-                                true,
-                                Collections.EMPTY_MAP);
+                    Map<String,Object> attributes = new HashMap<String,Object>();
+                    attributes.put(Queue.ID, UUIDGenerator.generateQueueUUID(name, _vhost.getName()));
+                    attributes.put(Queue.NAME, name);
+                    attributes.put(Queue.DURABLE, isDurable);
+                    attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS);
+                    attributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.LINK);
+
+                    queue = _vhost.createQueue(getSession(), attributes);
                 }
                 else
                 {
@@ -308,44 +310,6 @@
                 exchange.addBinding(binding, queue,null);
                 source.setDistributionMode(StdDistMode.COPY);
 
-                if(!isDurable)
-                {
-                    final String queueName = name;
-                    final AMQQueue tempQueue = queue;
-
-                    final Action<Connection_1_0> deleteQueueTask =
-                                            new Action<Connection_1_0>()
-                                            {
-                                                public void performAction(Connection_1_0 session)
-                                                {
-                                                    if (_vhost.getQueue(queueName) == tempQueue)
-                                                    {
-                                                        try
-                                                        {
-                                                            _vhost.removeQueue(tempQueue);
-                                                        }
-                                                        catch (QpidSecurityException e)
-                                                        {
-                                                            //TODO
-                                                            _logger.error("Error removing queue", e);
-                                                        }
-                                                    }
-                                                }
-                                            };
-
-                                    getSession().getConnection().addConnectionCloseTask(deleteQueueTask);
-
-                                    queue.addQueueDeleteTask(new Action<AMQQueue>()
-                                    {
-                                        public void performAction(AMQQueue queue)
-                                        {
-                                            getSession().getConnection().removeConnectionCloseTask(deleteQueueTask);
-                                        }
-
-
-                                    });
-                }
-
                 qd = new QueueDestination(queue);
             }
             catch (QpidSecurityException e)
@@ -409,6 +373,11 @@
                 _logger.info("Cannot add an exclusive consumer to the destination as there is already a consumer");
                 throw new ConnectionScopedRuntimeException(e);
             }
+            catch (MessageSource.ConsumerAccessRefused e)
+            {
+                _logger.info("Cannot add an exclusive consumer to the destination as there is an incompatible exclusivity policy");
+                throw new ConnectionScopedRuntimeException(e);
+            }
         }
 
     }
diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index c055d1e..6840c73 100644
--- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -26,8 +26,10 @@
 import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
 import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint;
 import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.SessionEndpoint;
 import org.apache.qpid.amqp_1_0.transport.SessionEventListener;
 import org.apache.qpid.amqp_1_0.type.*;
+import org.apache.qpid.amqp_1_0.type.LifetimePolicy;
 import org.apache.qpid.amqp_1_0.type.messaging.*;
 import org.apache.qpid.amqp_1_0.type.messaging.Source;
 import org.apache.qpid.amqp_1_0.type.messaging.Target;
@@ -36,13 +38,13 @@
 import org.apache.qpid.amqp_1_0.type.transport.*;
 
 import org.apache.qpid.amqp_1_0.type.transport.Error;
+import org.apache.qpid.server.model.*;
 import org.apache.qpid.server.security.QpidSecurityException;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.MessageSource;
-import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.LinkRegistry;
@@ -55,25 +57,34 @@
 import org.apache.qpid.server.virtualhost.QueueExistsException;
 
 import java.util.*;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
 
-public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSubject
+public class Session_1_0 implements SessionEventListener, AMQSessionModel<Session_1_0, Connection_1_0>, LogSubject
 {
     private static final Logger _logger = Logger.getLogger(Session_1_0.class);
     private static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy");
+    private final SessionEndpoint _endpoint;
     private VirtualHost _vhost;
     private AutoCommitTransaction _transaction;
 
     private final LinkedHashMap<Integer, ServerTransaction> _openTransactions =
             new LinkedHashMap<Integer, ServerTransaction>();
+
+    private final CopyOnWriteArrayList<Action<? super Session_1_0>> _taskList =
+            new CopyOnWriteArrayList<Action<? super Session_1_0>>();
+
     private final Connection_1_0 _connection;
     private UUID _id = UUID.randomUUID();
+    private AtomicBoolean _closed = new AtomicBoolean();
 
 
-    public Session_1_0(VirtualHost vhost, final Connection_1_0 connection)
+    public Session_1_0(VirtualHost vhost, final Connection_1_0 connection, final SessionEndpoint endpoint)
     {
         _vhost = vhost;
+        _endpoint = endpoint;
         _transaction = new AutoCommitTransaction(vhost.getMessageStore());
         _connection = connection;
 
@@ -333,64 +344,41 @@
             LifetimePolicy lifetimePolicy = properties == null
                                             ? null
                                             : (LifetimePolicy) properties.get(LIFETIME_POLICY);
+            Map<String,Object> attributes = new HashMap<String,Object>();
+            attributes.put(org.apache.qpid.server.model.Queue.ID, UUIDGenerator.generateQueueUUID(queueName, _vhost.getName()));
+            attributes.put(org.apache.qpid.server.model.Queue.NAME, queueName);
+            attributes.put(org.apache.qpid.server.model.Queue.DURABLE, false);
 
-            final AMQQueue tempQueue = queue = _vhost.createQueue( UUIDGenerator.generateQueueUUID(queueName, _vhost.getName()),
-                                                                   queueName,
-                                                                   false, // durable
-                                                                   null, // owner
-                                                                   false, // autodelete
-                                                                   false, // exclusive
-                                                                   false,
-                                                                   properties);
-
-
-
-            if (lifetimePolicy == null || lifetimePolicy instanceof DeleteOnClose)
+            if(lifetimePolicy instanceof DeleteOnNoLinks)
             {
-                final Action<Connection_1_0> deleteQueueTask =
-                        new Action<Connection_1_0>()
-                        {
-                            public void performAction(Connection_1_0 session)
-                            {
-                                if (_vhost.getQueue(queueName) == tempQueue)
-                                {
-                                    try
-                                    {
-                                        _vhost.removeQueue(tempQueue);
-                                    }
-                                    catch (QpidSecurityException e)
-                                    {
-                                        //TODO
-                                        _logger.error("Error removing queue from vhost", e);
-                                    }
-                                }
-                            }
-                        };
-
-                _connection.addConnectionCloseTask(deleteQueueTask);
-
-                queue.addQueueDeleteTask(new Action<AMQQueue>()
-                {
-                    public void performAction(AMQQueue queue)
-                    {
-                        _connection.removeConnectionCloseTask(deleteQueueTask);
-                    }
-
-
-                });
-            }
-            else if(lifetimePolicy instanceof DeleteOnNoLinks)
-            {
-
-            }
-            else if(lifetimePolicy instanceof DeleteOnNoMessages)
-            {
-
+                attributes.put(org.apache.qpid.server.model.Queue.LIFETIME_POLICY,
+                               org.apache.qpid.server.model.LifetimePolicy.DELETE_ON_NO_LINKS);
             }
             else if(lifetimePolicy instanceof DeleteOnNoLinksOrMessages)
             {
-
+                attributes.put(org.apache.qpid.server.model.Queue.LIFETIME_POLICY,
+                               org.apache.qpid.server.model.LifetimePolicy.IN_USE);
             }
+            else if(lifetimePolicy instanceof DeleteOnClose)
+            {
+                attributes.put(org.apache.qpid.server.model.Queue.LIFETIME_POLICY,
+                               org.apache.qpid.server.model.LifetimePolicy.DELETE_ON_CONNECTION_CLOSE);
+            }
+            else if(lifetimePolicy instanceof DeleteOnNoMessages)
+            {
+                attributes.put(org.apache.qpid.server.model.Queue.LIFETIME_POLICY,
+                               org.apache.qpid.server.model.LifetimePolicy.IN_USE);
+            }
+            else
+            {
+                attributes.put(org.apache.qpid.server.model.Queue.LIFETIME_POLICY,
+                               org.apache.qpid.server.model.LifetimePolicy.DELETE_ON_CONNECTION_CLOSE);
+            }
+
+
+            // TODO convert AMQP 1-0 node properties to queue attributes
+
+            final AMQQueue tempQueue = queue = _vhost.createQueue(this, attributes );
         }
         catch (QpidSecurityException e)
         {
@@ -462,11 +450,6 @@
 
     }
 
-    public void forceEnd()
-    {
-    }
-
-
     @Override
     public UUID getId()
     {
@@ -474,9 +457,9 @@
     }
 
     @Override
-    public AMQConnectionModel getConnectionModel()
+    public Connection_1_0 getConnectionModel()
     {
-        return _connection.getModel();
+        return _connection;
     }
 
     @Override
@@ -489,14 +472,35 @@
     @Override
     public void close()
     {
-        // TODO - required for AMQSessionModel / management initiated closing
+        performCloseTasks();
+        _endpoint.end();
+    }
+
+    protected void performCloseTasks()
+    {
+
+        if(_closed.compareAndSet(false, true))
+        {
+            List<Action<? super Session_1_0>> taskList = new ArrayList<Action<? super Session_1_0>>(_taskList);
+            _taskList.clear();
+            for(Action<? super Session_1_0> task : taskList)
+            {
+                task.performAction(this);
+            }
+        }
     }
 
 
     @Override
     public void close(AMQConstant cause, String message)
     {
-        // TODO - required for AMQSessionModel
+        performCloseTasks();
+        final End end = new End();
+        final Error theError = new Error();
+        theError.setDescription(message);
+        theError.setCondition(ConnectionError.CONNECTION_FORCED);
+        end.setError(theError);
+        _endpoint.end(end);
     }
 
     @Override
@@ -586,8 +590,7 @@
     @Override
     public int getChannelId()
     {
-        // TODO
-        return 0;
+        return _endpoint.getSendingChannel();
     }
 
     @Override
@@ -609,13 +612,12 @@
                                     connectionId,
                                     getClientID(),
                                     remoteAddress,
-                                    _vhost.getName(), // TODO - virtual host
-                                    0) // TODO - channel)
-            + "] ";
+                                    _vhost.getName(),
+                                    _endpoint.getSendingChannel())  + "] ";
     }
 
     @Override
-    public int compareTo(AMQSessionModel o)
+    public int compareTo(Session_1_0 o)
     {
         return getId().compareTo(o.getId());
     }
@@ -625,4 +627,18 @@
         return _connection;
     }
 
+    @Override
+    public void addDeleteTask(final Action<? super Session_1_0> task)
+    {
+        if(!_closed.get())
+        {
+            _taskList.add(task);
+        }
+    }
+
+    @Override
+    public void removeDeleteTask(final Action<? super Session_1_0> task)
+    {
+        _taskList.remove(task);
+    }
 }
diff --git a/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
index 7d76a0e..3ded20ae 100644
--- a/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
+++ b/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
@@ -971,36 +971,18 @@
     }
 
     @Override
-    public AuthorizationHolder getAuthorizationHolder()
-    {
-        return null;
-    }
-
-    @Override
-    public void setAuthorizationHolder(final AuthorizationHolder principalHolder)
-    {
-
-    }
-
-    @Override
-    public void setExclusiveOwningSession(final AMQSessionModel owner)
-    {
-
-    }
-
-    @Override
-    public AMQSessionModel getExclusiveOwningSession()
-    {
-        return null;
-    }
-
-    @Override
     public boolean isExclusive()
     {
         return false;
     }
 
     @Override
+    public boolean verifySessionAccess(final AMQSessionModel<?, ?> session)
+    {
+        return true;
+    }
+
+    @Override
     public String getName()
     {
         return MANAGEMENT_NODE_NAME;
diff --git a/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java b/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java
index 407da0f..ed5e195 100644
--- a/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java
+++ b/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java
@@ -166,7 +166,7 @@
 
     public boolean isAutoDelete()
     {
-        return _exchange.getLifetimePolicy() == LifetimePolicy.AUTO_DELETE;
+        return _exchange.getLifetimePolicy() != LifetimePolicy.PERMANENT;
     }
 
     public TabularData bindings() throws IOException, JMException
diff --git a/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java b/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
index 1365ceb..b44a752 100644
--- a/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
+++ b/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
@@ -50,6 +50,7 @@
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.ExclusivityPolicy;
 import org.apache.qpid.server.model.LifetimePolicy;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.QueueNotificationListener;
@@ -194,7 +195,7 @@
 
     public boolean isAutoDelete()
     {
-        return _queue.getLifetimePolicy() == LifetimePolicy.AUTO_DELETE;
+        return _queue.getLifetimePolicy() != LifetimePolicy.PERMANENT;
     }
 
     public Long getMaximumMessageAge()
@@ -264,12 +265,29 @@
 
     public boolean isExclusive()
     {
-        return (Boolean) _queue.getAttribute(Queue.EXCLUSIVE);
+        final Object attribute = _queue.getAttribute(Queue.EXCLUSIVE);
+        return attribute != null && attribute != ExclusivityPolicy.NONE;
     }
 
     public void setExclusive(boolean exclusive)
     {
-        _queue.setAttribute(Queue.EXCLUSIVE, isExclusive(), exclusive);
+        if(exclusive)
+        {
+            Object currentValue = _queue.getAttribute(Queue.EXCLUSIVE);
+            if(currentValue == null || currentValue == ExclusivityPolicy.NONE)
+            {
+                _queue.setAttribute(Queue.EXCLUSIVE, currentValue, ExclusivityPolicy.CONTAINER);
+            }
+        }
+        else
+        {
+            Object currentValue = _queue.getAttribute(Queue.EXCLUSIVE);
+            if(currentValue != null && currentValue != ExclusivityPolicy.NONE)
+            {
+                _queue.setAttribute(Queue.EXCLUSIVE, currentValue, ExclusivityPolicy.NONE);
+            }
+        }
+
     }
 
     public void setAlternateExchange(String exchangeName) throws OperationsException
diff --git a/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java b/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java
index 2c88f83..c39c3f7 100644
--- a/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java
+++ b/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java
@@ -181,8 +181,13 @@
             throws IOException, JMException
     {
         final Map<String, Object> createArgs = processNewQueueArguments(queueName, owner, originalArguments);
-        getConfiguredObject().createQueue(queueName, State.ACTIVE, durable, false, LifetimePolicy.PERMANENT, 0l,
-                QueueArgumentsConverter.convertWireArgsToModel(createArgs));
+
+        final Map<String, Object> attributes = QueueArgumentsConverter.convertWireArgsToModel(createArgs);
+        attributes.put(Queue.NAME, queueName);
+        attributes.put(Queue.DURABLE, durable);
+        attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.PERMANENT);
+
+        getConfiguredObject().createQueue(attributes);
     }
 
 
diff --git a/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java b/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java
index f94c206..2874168 100644
--- a/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java
+++ b/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java
@@ -40,6 +40,7 @@
 import org.apache.qpid.server.jmx.mbeans.QueueMBean.GetMessageVisitor;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.ExclusivityPolicy;
 import org.apache.qpid.server.model.LifetimePolicy;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.Statistics;
@@ -154,7 +155,7 @@
 
     public void testIsAutoDelete() throws Exception
     {
-        when(_mockQueue.getLifetimePolicy()).thenReturn(LifetimePolicy.AUTO_DELETE);
+        when(_mockQueue.getLifetimePolicy()).thenReturn(LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS);
         assertTrue(_queueMBean.isAutoDelete());
     }
 
@@ -224,22 +225,31 @@
         testSetAttribute("flowResumeCapacity", Queue.QUEUE_FLOW_RESUME_SIZE_BYTES,1048576l , 2097152l);
     }
 
+
+    /**********  Other attributes **********/
+
+
     public void testIsExclusive() throws Exception
     {
-        assertAttribute("exclusive", Boolean.TRUE, Queue.EXCLUSIVE);
+        when(_mockQueue.getAttribute(Queue.EXCLUSIVE)).thenReturn(ExclusivityPolicy.CONTAINER);
+        MBeanTestUtils.assertMBeanAttribute(_queueMBean, "exclusive", true);
     }
 
     public void testIsNotExclusive() throws Exception
     {
-        assertAttribute("exclusive", Boolean.FALSE, Queue.EXCLUSIVE);
+        when(_mockQueue.getAttribute(Queue.EXCLUSIVE)).thenReturn(ExclusivityPolicy.NONE);
+        MBeanTestUtils.assertMBeanAttribute(_queueMBean, "exclusive", false);
     }
 
     public void testSetExclusive() throws Exception
     {
-        testSetAttribute("exclusive", Queue.EXCLUSIVE, Boolean.FALSE , Boolean.TRUE);
-    }
+        when(_mockQueue.getAttribute(Queue.EXCLUSIVE)).thenReturn(ExclusivityPolicy.NONE);
 
-    /**********  Other attributes **********/
+        MBeanTestUtils.setMBeanAttribute(_queueMBean, "exclusive", Boolean.TRUE);
+
+        verify(_mockQueue).setAttribute(Queue.EXCLUSIVE, ExclusivityPolicy.NONE, ExclusivityPolicy.CONTAINER);
+
+    }
 
     public void testGetAlternateExchange()
     {
diff --git a/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBeanTest.java b/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBeanTest.java
index 4240dd5..2dc2cb8 100644
--- a/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBeanTest.java
+++ b/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBeanTest.java
@@ -39,6 +39,7 @@
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.queue.AMQQueueFactory;
 import org.apache.qpid.server.queue.QueueArgumentsConverter;
+import org.mockito.ArgumentCaptor;
 
 public class VirtualHostManagerMBeanTest extends TestCase
 {
@@ -68,8 +69,15 @@
     public void testCreateQueueWithNoOwner() throws Exception
     {
         _virtualHostManagerMBean.createNewQueue(TEST_QUEUE_NAME, null, true);
+        ArgumentCaptor<Map> argsCaptor = ArgumentCaptor.forClass(Map.class);
 
-        verify(_mockVirtualHost).createQueue(TEST_QUEUE_NAME, State.ACTIVE, true, false, LifetimePolicy.PERMANENT, 0, EMPTY_ARGUMENT_MAP);
+        verify(_mockVirtualHost).createQueue(argsCaptor.capture());
+
+        Map actualAttributes = argsCaptor.getValue();
+        assertEquals(TEST_QUEUE_NAME, actualAttributes.get(Queue.NAME));
+        assertEquals(Boolean.TRUE,actualAttributes.get(Queue.DURABLE));
+        assertEquals(null,actualAttributes.get(Queue.OWNER));
+
     }
 
     /**
@@ -79,9 +87,15 @@
     public void testCreateQueueWithOwnerMappedThroughToDescription() throws Exception
     {
         _virtualHostManagerMBean.createNewQueue(TEST_QUEUE_NAME, TEST_OWNER, true);
+        ArgumentCaptor<Map> argsCaptor = ArgumentCaptor.forClass(Map.class);
 
-        Map<String, Object> expectedArguments = Collections.singletonMap(Queue.DESCRIPTION, (Object)TEST_OWNER);
-        verify(_mockVirtualHost).createQueue(TEST_QUEUE_NAME, State.ACTIVE, true, false, LifetimePolicy.PERMANENT, 0, expectedArguments);
+        verify(_mockVirtualHost).createQueue(argsCaptor.capture());
+
+        Map actualAttributes = argsCaptor.getValue();
+        assertEquals(TEST_QUEUE_NAME,actualAttributes.get(Queue.NAME));
+        assertEquals(Boolean.TRUE,actualAttributes.get(Queue.DURABLE));
+        assertEquals(null,actualAttributes.get(Queue.OWNER));
+        assertEquals(TEST_OWNER, actualAttributes.get(Queue.DESCRIPTION));
     }
 
     public void testCreateQueueWithOwnerAndDescriptionDiscardsOwner() throws Exception
@@ -89,8 +103,15 @@
         Map<String, Object> arguments = Collections.singletonMap(QueueArgumentsConverter.X_QPID_DESCRIPTION, (Object)TEST_DESCRIPTION);
         _virtualHostManagerMBean.createNewQueue(TEST_QUEUE_NAME, TEST_OWNER, true, arguments);
 
-        Map<String, Object> expectedArguments = Collections.singletonMap(Queue.DESCRIPTION, (Object)TEST_DESCRIPTION);
-        verify(_mockVirtualHost).createQueue(TEST_QUEUE_NAME, State.ACTIVE, true, false, LifetimePolicy.PERMANENT, 0, expectedArguments);
+        ArgumentCaptor<Map> argsCaptor = ArgumentCaptor.forClass(Map.class);
+
+        verify(_mockVirtualHost).createQueue(argsCaptor.capture());
+
+        Map actualAttributes = argsCaptor.getValue();
+        assertEquals(TEST_QUEUE_NAME,actualAttributes.get(Queue.NAME));
+        assertEquals(Boolean.TRUE,actualAttributes.get(Queue.DURABLE));
+        assertEquals(null,actualAttributes.get(Queue.OWNER));
+        assertEquals(TEST_DESCRIPTION, actualAttributes.get(Queue.DESCRIPTION));
     }
 
     public void testDeleteQueue() throws Exception
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/ModelTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/ModelTest.java
index 41d93b5..c6b2c9e 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/ModelTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/ModelTest.java
@@ -290,7 +290,7 @@
         session.createQueue(new AMQShortString(queueName),
                             autoDelete, durable, exclusive);
 
-        validateQueueViaJMX(queueName, exclusive ? connection.getClientID() : null, durable, autoDelete);
+        validateQueueViaJMX(queueName, (exclusive && durable &&!isBroker010()) ? connection.getClientID() : null, durable, autoDelete || (exclusive && !isBroker010() && !durable));
     }
 
     /**
diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java b/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
index bdcdbe2..709d8ae 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
@@ -23,7 +23,7 @@
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
+
 import org.apache.commons.configuration.PropertiesConfiguration;
 
 import org.apache.log4j.Logger;
@@ -41,6 +41,9 @@
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.exchange.TopicExchange;
 import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.model.ExclusivityPolicy;
+import org.apache.qpid.server.model.LifetimePolicy;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.protocol.v0_8.AMQMessage;
 import org.apache.qpid.server.protocol.v0_8.MessageMetaData;
@@ -51,7 +54,6 @@
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.ConflationQueue;
 import org.apache.qpid.server.queue.StandardQueue;
-import org.apache.qpid.server.security.QpidSecurityException;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.BrokerTestHelper;
@@ -539,11 +541,10 @@
         }
     }
 
-    private void setQueueExclusivity(boolean exclusive) throws AMQException
+    private void setQueueExclusivity(boolean exclusive) throws MessageSource.ExistingConsumerPreventsExclusive
     {
         AMQQueue queue = getVirtualHost().getQueue(durableExclusiveQueueName);
-
-        queue.setExclusive(exclusive);
+        queue.setExclusivityPolicy(exclusive ? ExclusivityPolicy.CONTAINER : ExclusivityPolicy.NONE);
     }
 
     private void validateQueueExclusivityProperty(boolean expected)
@@ -587,7 +588,7 @@
             assertEquals("Queue is not 'simple'", StandardQueue.class, queue.getClass());
         }
 
-        assertEquals("Queue owner is not as expected", queueOwner, queue.getOwner());
+        assertEquals("Queue owner is not as expected", exclusive ? queueOwner : null, queue.getOwner());
         assertEquals("Queue durability is not as expected", durable, queue.isDurable());
         assertEquals("Queue exclusivity is not as expected", exclusive, queue.isExclusive());
     }
@@ -671,7 +672,7 @@
             throws Exception
     {
 
-        Map<String,Object> queueArguments = null;
+        Map<String,Object> queueArguments = new HashMap<String, Object>();
 
         if(usePriority || lastValueQueue)
         {
@@ -680,19 +681,27 @@
 
         if (usePriority)
         {
-            queueArguments = Collections.singletonMap(Queue.PRIORITIES, (Object) DEFAULT_PRIORTY_LEVEL);
+            queueArguments.put(Queue.PRIORITIES, DEFAULT_PRIORTY_LEVEL);
         }
 
         if (lastValueQueue)
         {
-            queueArguments = Collections.singletonMap(Queue.LVQ_KEY, (Object) LVQ_KEY);
+            queueArguments.put(Queue.LVQ_KEY, LVQ_KEY);
         }
 
+        queueArguments.put(Queue.ID, UUIDGenerator.generateRandomUUID());
+        queueArguments.put(Queue.NAME, queueName);
+        queueArguments.put(Queue.DURABLE, durable);
+        queueArguments.put(Queue.LIFETIME_POLICY, LifetimePolicy.PERMANENT);
+        queueArguments.put(Queue.EXCLUSIVE, exclusive ? ExclusivityPolicy.CONTAINER : ExclusivityPolicy.NONE);
+        if(exclusive && queueOwner != null)
+        {
+            queueArguments.put(Queue.OWNER, queueOwner);
+        }
         AMQQueue queue = null;
 
         //Ideally we would be able to use the QueueDeclareHandler here.
-        queue = getVirtualHost().createQueue(UUIDGenerator.generateRandomUUID(), queueName, durable, queueOwner, false, exclusive,
-                false, queueArguments);
+        queue = getVirtualHost().createQueue(null, queueArguments);
 
         validateQueueProperties(queue, usePriority, durable, exclusive, lastValueQueue);
 
diff --git a/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java b/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java
index 3c15a45..9319749 100644
--- a/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java
@@ -130,9 +130,10 @@
 
     public void testExclusiveQueueHasJmsClientIdAsOwner() throws Exception
     {
-        Queue tmpQueue = _session.createTemporaryQueue();
+        final String subName = "testOwner";
+        _session.createDurableSubscriber(getTestTopic(), subName);
 
-        final String queueName = tmpQueue.getQueueName();
+        final String queueName = _connection.getClientID() + ":" + subName;
 
         final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName);
         assertNotNull(_connection.getClientID());
diff --git a/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java b/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java
index 9028130..1b27ee7 100644
--- a/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java
+++ b/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java
@@ -31,16 +31,7 @@
 import javax.jms.JMSException;
 
 import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.server.model.Binding;
-import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.Connection;
-import org.apache.qpid.server.model.Exchange;
-import org.apache.qpid.server.model.LifetimePolicy;
-import org.apache.qpid.server.model.Port;
-import org.apache.qpid.server.model.Protocol;
-import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.model.State;
-import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.model.*;
 import org.apache.qpid.test.utils.TestBrokerConfiguration;
 
 public class Asserts
@@ -100,7 +91,8 @@
         assertEquals("Unexpected value of queue attribute " + Queue.QUEUE_TYPE, queueType, queueData.get(Queue.QUEUE_TYPE));
         if (expectedAttributes == null)
         {
-            assertEquals("Unexpected value of queue attribute " + Queue.EXCLUSIVE, Boolean.FALSE, queueData.get(Queue.EXCLUSIVE));
+            assertEquals("Unexpected value of queue attribute " + Queue.EXCLUSIVE,
+                         ExclusivityPolicy.NONE.name(), queueData.get(Queue.EXCLUSIVE));
             assertEquals("Unexpected value of queue attribute " + Queue.MAXIMUM_DELIVERY_ATTEMPTS, 0,
                     queueData.get(Queue.MAXIMUM_DELIVERY_ATTEMPTS));
             assertEquals("Unexpected value of queue attribute " + Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES, 0,
diff --git a/java/systests/src/main/java/org/apache/qpid/systest/rest/QueueRestTest.java b/java/systests/src/main/java/org/apache/qpid/systest/rest/QueueRestTest.java
index fec516b..3833af3 100644
--- a/java/systests/src/main/java/org/apache/qpid/systest/rest/QueueRestTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/systest/rest/QueueRestTest.java
@@ -145,7 +145,6 @@
         attributes.put(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, 40000);
         attributes.put(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, 50000);
         attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 10);
-        attributes.put(Queue.EXCLUSIVE, true);
 
         responseCode = getRestTestHelper().submitRequest("/rest/queue/test/" + queueName, "PUT", attributes);
         assertEquals("Setting of queue attribites should be allowed", 200, responseCode);
@@ -158,7 +157,6 @@
         assertEquals("Unexpected " + Queue.ALERT_THRESHOLD_MESSAGE_SIZE, 30000, queueData.get(Queue.ALERT_THRESHOLD_MESSAGE_SIZE) );
         assertEquals("Unexpected " + Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, 40000, queueData.get(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES) );
         assertEquals("Unexpected " + Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, 50000, queueData.get(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES) );
-        assertEquals("Unexpected " + Queue.EXCLUSIVE, true, queueData.get(Queue.EXCLUSIVE) );
     }
 
     public void testPutCreateBinding() throws Exception
@@ -217,7 +215,7 @@
 
         assertEquals("Unexpected binding attribute " + Consumer.NAME, "1", consumer.get(Consumer.NAME));
         assertEquals("Unexpected binding attribute " + Consumer.DURABLE, Boolean.FALSE, consumer.get(Consumer.DURABLE));
-        assertEquals("Unexpected binding attribute " + Consumer.LIFETIME_POLICY, LifetimePolicy.AUTO_DELETE.name(),
+        assertEquals("Unexpected binding attribute " + Consumer.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_SESSION_END.name(),
                 consumer.get(Consumer.LIFETIME_POLICY));
         assertEquals("Unexpected binding attribute " + Consumer.DISTRIBUTION_MODE, "MOVE",
                 consumer.get(Consumer.DISTRIBUTION_MODE));
diff --git a/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java b/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java
index 96ca0c2..d7b17fd 100644
--- a/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java
@@ -36,7 +36,7 @@
 import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.ConflationQueue;
 import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory;
 import org.apache.qpid.test.utils.TestFileUtils;
 import org.apache.qpid.util.FileUtils;
@@ -291,7 +291,7 @@
 
         Asserts.assertQueue(queueName , "lvq", lvqQueue);
         assertEquals("Unexpected value of queue attribute " + Queue.DURABLE, Boolean.TRUE, lvqQueue.get(Queue.DURABLE));
-        assertEquals("Unexpected lvq key attribute", AMQQueueFactory.QPID_DEFAULT_LVQ_KEY, lvqQueue.get(Queue.LVQ_KEY));
+        assertEquals("Unexpected lvq key attribute", ConflationQueue.DEFAULT_LVQ_KEY, lvqQueue.get(Queue.LVQ_KEY));
     }
 
     public void testPutCreateSortedQueueWithoutKey() throws Exception
diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java
index 6356b17..6473a77 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java
@@ -321,14 +321,7 @@
         MessageConsumer consumer;
         if(durableSub)
         {
-            if (isBroker010())
-            {
-                consumer = clientSession.createConsumer(clientSession.createQueue("clientid:" +getName() + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX));
-            }
-            else
-            {
-                consumer = clientSession.createDurableSubscriber(clientSession.createTopic(destName), getName() + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
-            }
+            consumer = clientSession.createConsumer(clientSession.createQueue("clientid:" +getName() + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX));
         }
         else
         {
diff --git a/java/test-profiles/Java010Excludes b/java/test-profiles/Java010Excludes
index 67c2fcd..cbdb0c1 100755
--- a/java/test-profiles/Java010Excludes
+++ b/java/test-profiles/Java010Excludes
@@ -35,6 +35,9 @@
 org.apache.qpid.server.logging.ChannelLoggingTest#testChannelStartConsumerFlowStarted
 org.apache.qpid.server.logging.ConsumerLoggingTest#testSubscriptionSuspend
 org.apache.qpid.server.logging.ChannelLoggingTest#testChannelClosedOnQueueArgumentsMismatch
+// 0-10 exclusive queue is session exclusive and not container exclusive (so exclusive owner is not client-id)
+org.apache.qpid.server.logging.DurableQueueLoggingTest#testQueueCreateDurableExclusive
+org.apache.qpid.server.logging.TransientQueueLoggingTest#testQueueCreateDurableExclusive
 
 // 0-10 is not supported by the MethodRegistry
 org.apache.qpid.test.unit.close.JavaServerCloseRaceConditionTest#*
@@ -69,3 +72,6 @@
 
 // Java 0-10 client does not support re-binding the queue to the same exchange
 org.apache.qpid.server.queue.QueueBindTest#testQueueCanBeReboundOnTopicExchange
+
+// Java 0-10 exclusive queues are owned by sessions not by the client-id, so the owner is not meaningful from JMX
+org.apache.qpid.systest.management.jmx.QueueManagementTest#testExclusiveQueueHasJmsClientIdAsOwner
diff --git a/java/test-profiles/JavaPre010Excludes b/java/test-profiles/JavaPre010Excludes
index 7f9a20e..ff8bcfc 100644
--- a/java/test-profiles/JavaPre010Excludes
+++ b/java/test-profiles/JavaPre010Excludes
@@ -78,3 +78,8 @@
 
 // QPID-3396
 org.apache.qpid.test.unit.client.connection.ConnectionTest#testExceptionWhenUserPassIsRequired
+
+// Non durable exclusive queues are exclusive to a connection (and thus auto-delete - or at least
+// not permanent), also their owner is not the container (client-id) but the connection
+org.apache.qpid.server.logging.TransientQueueLoggingTest#testQueueCreateDurableExclusive
+