QPID-3401 Checking the proposed changes into a branch to preserve history & continue working until such time it's accepted into trunk.


git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/address-refactor@1183532 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/Drain.java b/java/client/example/src/main/java/org/apache/qpid/example/Drain.java
index b43031a..e2db7c3 100644
--- a/java/client/example/src/main/java/org/apache/qpid/example/Drain.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/Drain.java
@@ -26,9 +26,7 @@
 import javax.jms.MessageConsumer;
 import javax.jms.Session;
 
-import org.apache.qpid.client.AMQAnyDestination;
-import org.apache.qpid.client.AMQConnection;
-import org.slf4j.Logger;
+import org.apache.qpid.client.AddressBasedDestination;
 
 public class Drain extends OptionParser
 {
@@ -66,7 +64,7 @@
         Connection con = createConnection();
         con.start();
         Session ssn = con.createSession(false,Session.AUTO_ACKNOWLEDGE);     
-        Destination dest = new AMQAnyDestination(address);
+        Destination dest = new AddressBasedDestination(address);
         MessageConsumer consumer = ssn.createConsumer(dest);
         Message msg;
         
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/MapReceiver.java b/java/client/example/src/main/java/org/apache/qpid/example/MapReceiver.java
index 89db04f..ae71a12 100644
--- a/java/client/example/src/main/java/org/apache/qpid/example/MapReceiver.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/MapReceiver.java
@@ -27,8 +27,8 @@
 import javax.jms.MessageConsumer;
 import javax.jms.Session;
 
-import org.apache.qpid.client.AMQAnyDestination;
 import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AddressBasedDestination;
 
 
 public class MapReceiver {
@@ -41,7 +41,7 @@
         connection.start();
         
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Destination queue = new AMQAnyDestination("ADDR:message_queue; {create: always}");
+        Destination queue = new AddressBasedDestination("message_queue; {create: always}");
         MessageConsumer consumer = session.createConsumer(queue);
 
         MapMessage m = (MapMessage)consumer.receive();
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/MapSender.java b/java/client/example/src/main/java/org/apache/qpid/example/MapSender.java
index 0ce9383..fa85c00 100644
--- a/java/client/example/src/main/java/org/apache/qpid/example/MapSender.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/MapSender.java
@@ -33,8 +33,8 @@
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 
-import org.apache.qpid.client.AMQAnyDestination;
 import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AddressBasedDestination;
 
 
 public class MapSender {
@@ -45,7 +45,7 @@
             new AMQConnection("amqp://guest:guest@test/?brokerlist='tcp://localhost:5672'");
         
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Destination queue = new AMQAnyDestination("ADDR:message_queue; {create: always}");
+        Destination queue = new AddressBasedDestination("ADDR:message_queue; {create: always}");
         MessageProducer producer = session.createProducer(queue);
 
         MapMessage m = session.createMapMessage();
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/Spout.java b/java/client/example/src/main/java/org/apache/qpid/example/Spout.java
index 5da319a6..6e7b7c8 100644
--- a/java/client/example/src/main/java/org/apache/qpid/example/Spout.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/Spout.java
@@ -27,7 +27,7 @@
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 
-import org.apache.qpid.client.AMQAnyDestination;
+import org.apache.qpid.client.AddressBasedDestination;
 
 public class Spout extends OptionParser
 {
@@ -87,7 +87,7 @@
         Connection con = createConnection();
         con.start();
         Session ssn = con.createSession(false,Session.AUTO_ACKNOWLEDGE);     
-        Destination dest = new AMQAnyDestination(address);
+        Destination dest = new AddressBasedDestination(address);
         MessageProducer producer = ssn.createProducer(dest);
         
         int count = Integer.parseInt(getOp(COUNT));
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java
deleted file mode 100644
index 999b222..0000000
--- a/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client;
-
-import java.net.URISyntaxException;
-
-import javax.jms.JMSException;
-import javax.jms.Queue;
-import javax.jms.Topic;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.messaging.Address;
-import org.apache.qpid.url.BindingURL;
-
-/**
- * In order to support JMS 1.0 the Qpid implementation maps the 
- * direct exchange to JMS Queue and topic exchange to JMS Topic.
- * 
- * The JMS 1.1 spec provides a javax.Destination as an abstraction
- * to represent any type of destination. 
- * The abstract class AMQDestination has most of the functionality
- * to support any destination defined in AMQP 0-10 spec.
- */
-public class AMQAnyDestination extends AMQDestination implements Queue, Topic
-{    
-    public AMQAnyDestination(BindingURL binding)
-    {
-        super(binding);
-    }
-
-    public AMQAnyDestination(String str) throws URISyntaxException
-    {
-        super(str);
-    }
-    
-    public AMQAnyDestination(Address addr) throws Exception
-    {
-        super(addr);
-    }
-    
-    public AMQAnyDestination(AMQShortString exchangeName,AMQShortString exchangeClass,
-                             AMQShortString routingKey,boolean isExclusive, 
-                             boolean isAutoDelete, AMQShortString queueName, 
-                             boolean isDurable, AMQShortString[] bindingKeys)
-    {
-        super(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, isDurable, bindingKeys);
-    }
-
-    @Override
-    public boolean isNameRequired()
-    {
-        return getAMQQueueName() == null;
-    }
-
-    public String getTopicName() throws JMSException
-    {
-        if (getRoutingKey() != null)
-        {
-            return getRoutingKey().asString();
-        }
-        else if (getSubject() != null)
-        {
-            return getSubject();
-        }
-        else
-        {
-            return null;
-        }
-    }
-}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
index acd46da..64b3623 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
@@ -20,22 +20,16 @@
  */
 package org.apache.qpid.client;
 
-import java.net.URISyntaxException;
-import java.util.Map;
-
 import javax.jms.Destination;
+import javax.jms.JMSException;
 import javax.naming.NamingException;
 import javax.naming.Reference;
 import javax.naming.Referenceable;
 import javax.naming.StringRefAddr;
 
-import org.apache.qpid.client.messaging.address.AddressHelper;
-import org.apache.qpid.client.messaging.address.Link;
-import org.apache.qpid.client.messaging.address.Node;
 import org.apache.qpid.configuration.ClientProperties;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.messaging.Address;
 import org.apache.qpid.url.AMQBindingURL;
 import org.apache.qpid.url.BindingURL;
 import org.apache.qpid.url.URLHelper;
@@ -57,10 +51,8 @@
 
     protected boolean _isAutoDelete;
 
-    private boolean _browseOnly;
+    protected boolean _browseOnly;
     
-    private boolean _isAddressResolved;
-
     private AMQShortString _queueName;
 
     private AMQShortString _routingKey;
@@ -99,39 +91,12 @@
                                                  " should be one of {BURL|ADDR}");
           }
       }
-    } 
-    
-    public enum AddressOption { 
-      ALWAYS, NEVER, SENDER, RECEIVER; 
-        
-      public static AddressOption getOption(String str)
-      {
-          if ("always".equals(str)) return ALWAYS;
-          else if ("never".equals(str)) return NEVER;
-          else if ("sender".equals(str)) return SENDER;
-          else if ("receiver".equals(str)) return RECEIVER;
-          else throw new IllegalArgumentException(str + " is not an allowed value");
-      }
-    }
+    }    
     
     protected final static DestSyntax defaultDestSyntax;
     
-    protected DestSyntax _destSyntax = DestSyntax.ADDR;
-
-    protected AddressHelper _addrHelper;
-    protected Address _address;
-    protected int _addressType = AMQDestination.UNKNOWN_TYPE;
-    protected String _name;
-    protected String _subject;
-    protected AddressOption _create = AddressOption.NEVER;
-    protected AddressOption _assert = AddressOption.NEVER;
-    protected AddressOption _delete = AddressOption.NEVER; 
-
-    protected Node _targetNode;
-    protected Node _sourceNode;
-    protected Link _targetLink;
-    protected Link _link;    
-        
+    protected DestSyntax _destSyntax = DestSyntax.BURL;
+            
     // ----- / Fields required to support new address syntax -------
     
     static
@@ -148,14 +113,6 @@
         return defaultDestSyntax;
     }
 
-    protected AMQDestination(Address address) throws Exception
-    {
-        this._address = address;
-        getInfoFromAddress();
-        _destSyntax = DestSyntax.ADDR;
-        _logger.debug("Based on " + address + " the selected destination syntax is " + _destSyntax);
-    }
-    
     public  static DestSyntax getDestType(String str)
     {
         if (str.startsWith("BURL:") || 
@@ -181,30 +138,8 @@
         }
     }
     
-    protected AMQDestination(String str) throws URISyntaxException
-    {
-        _destSyntax = getDestType(str);
-        str = stripSyntaxPrefix(str);
-        if (_destSyntax == DestSyntax.BURL)
-        {    
-            getInfoFromBindingURL(new AMQBindingURL(str));            
-        }
-        else
-        {
-            this._address = createAddressFromString(str);
-            try
-            {
-                getInfoFromAddress();
-            }
-            catch(Exception e)
-            {
-                URISyntaxException ex = new URISyntaxException(str,"Error parsing address");
-                ex.initCause(e);
-                throw ex;
-            }
-        }
-        _logger.debug("Based on " + str + " the selected destination syntax is " + _destSyntax);
-    }
+    // Used by the AddressBasedDestination impl
+    protected AMQDestination() {}
     
     //retained for legacy support
     protected AMQDestination(BindingURL binding)
@@ -400,15 +335,7 @@
 
     public String toString()
     {
-        if (_destSyntax == DestSyntax.BURL)
-        {
-            return toURL();
-        }
-        else
-        {
-            return _address.toString();
-        }
-
+        return toURL();
     }
 
     public boolean isCheckedForQueueBinding()
@@ -559,7 +486,7 @@
                 null);          // factory location
     }
 
-    public static Destination createDestination(BindingURL binding)
+    public static Destination createDestination(BindingURL binding) throws JMSException
     {
         AMQShortString type = binding.getExchangeClass();
 
@@ -575,9 +502,13 @@
         {
             return new AMQHeadersExchange(binding);
         }
+        else if (type.equals(ExchangeDefaults.FANOUT_EXCHANGE_CLASS))
+        {
+            return new AMQQueue(binding);
+        }
         else
         {
-            return new AMQAnyDestination(binding);
+            throw new JMSException("Unsupported exchange type");
         }
     }
 
@@ -591,249 +522,36 @@
          }
          else
          {
-             Address address = createAddressFromString(str);
-             return new AMQAnyDestination(address);
+             return new AddressBasedDestination(str);
          }
     }
     
-    // ----- new address syntax -----------
-    
-    public static class Binding
-    {
-        String exchange;
-        String bindingKey;
-        String queue;
-        Map<String,Object> args;
-        
-        public Binding(String exchange,
-                       String queue,
-                       String bindingKey,
-                       Map<String,Object> args)
-        {
-            this.exchange = exchange;
-            this.queue = queue;
-            this.bindingKey = bindingKey;
-            this.args = args;
-        }
-        
-        public String getExchange() 
-        {
-            return exchange;
-        }
-
-        public String getQueue() 
-        {
-            return queue;
-        }
-        
-        public String getBindingKey() 
-        {
-            return bindingKey;
-        }
-        
-        public Map<String, Object> getArgs() 
-        {
-            return args;
-        }
-    }
-    
-    public Address getAddress() {
-        return _address;
-    }
-    
-    protected void setAddress(Address addr) {
-        _address = addr;
-    }
-    
-    public int getAddressType(){
-        return _addressType;
-    }
-
-    public void setAddressType(int addressType){
-        _addressType = addressType;
-    }
-    
-    public String getAddressName() {
-        return _name;
-    }
-    
-    public void setAddressName(String name){
-        _name = name;
-    }
-
-    public String getSubject() {
-        return _subject;
-    }
-
-    public void setSubject(String subject) {
-        _subject = subject;
-    }
-    
-    public AddressOption getCreate() {
-        return _create;
-    }
-    
-    public void setCreate(AddressOption option) {
-        _create = option;
-    }
-   
-    public AddressOption getAssert() {
-        return _assert;
-    }
-
-    public void setAssert(AddressOption option) {
-        _assert = option;
-    }
-    
-    public AddressOption getDelete() {
-        return _delete;
-    }
-
-    public void setDelete(AddressOption option) {
-        _delete = option;
-    }
-
-    public Node getTargetNode()
-    {
-        return _targetNode;
-    }
-
-    public void setTargetNode(Node node)
-    {
-        _targetNode = node;
-    }
-
-    public Node getSourceNode()
-    {
-        return _sourceNode;
-    }
-
-    public void setSourceNode(Node node)
-    {
-        _sourceNode = node;
-    }
-
-    public Link getLink()
-    {
-        return _link;
-    }
-
-    public void setLink(Link link)
-    {
-        _link = link;
-    }
-    
-    public void setExchangeName(AMQShortString name)
-    {
-        this._exchangeName = name;
-    }
-    
-    public void setExchangeClass(AMQShortString type)
-    {
-        this._exchangeClass = type;
-    }
-    
-    public void setRoutingKey(AMQShortString rk)
-    {
-        this._routingKey = rk;
-    }
-    
-    public boolean isAddressResolved()
-    {
-        return _isAddressResolved;
-    }
-
-    public void setAddressResolved(boolean addressResolved)
-    {
-        _isAddressResolved = addressResolved;
-    }
-    
-    private static Address createAddressFromString(String str)
-    {
-        return Address.parse(str);
-    }
-    
-    private void getInfoFromAddress() throws Exception
-    {
-        _name = _address.getName();
-        _subject = _address.getSubject();
-        
-        _addrHelper = new AddressHelper(_address);
-        
-        _create = _addrHelper.getCreate() != null ?
-                 AddressOption.getOption(_addrHelper.getCreate()):AddressOption.NEVER;
-                
-        _assert = _addrHelper.getAssert() != null ?
-                 AddressOption.getOption(_addrHelper.getAssert()):AddressOption.NEVER;
-
-        _delete = _addrHelper.getDelete() != null ?
-                 AddressOption.getOption(_addrHelper.getDelete()):AddressOption.NEVER;
-                 
-        _browseOnly = _addrHelper.isBrowseOnly();
-                        
-        _addressType = _addrHelper.getTargetNodeType();         
-        _targetNode =  _addrHelper.getTargetNode(_addressType);
-        _sourceNode = _addrHelper.getSourceNode(_addressType);
-        _link = _addrHelper.getLink();       
-    }
-    
-    // This method is needed if we didn't know the node type at the beginning.
-    // Therefore we have to query the broker to figure out the type.
-    // Once the type is known we look for the necessary properties.
-    public void rebuildTargetAndSourceNodes(int addressType)
-    {
-        _targetNode =  _addrHelper.getTargetNode(addressType);
-        _sourceNode =  _addrHelper.getSourceNode(addressType);
-    }
-    
-    // ----- / new address syntax -----------    
-
     public boolean isBrowseOnly()
     {
         return _browseOnly;
     }
-    
-    private void setBrowseOnly(boolean b)
+  
+    public long getConsumerCapacity(AMQSession ssn) throws Exception
     {
-        _browseOnly = b;
+        if (ssn.prefetch())
+        {
+            return ssn.getAMQConnection().getMaxPrefetch();
+        }
+        else
+        {
+            return 0;
+        }
     }
     
-    public AMQDestination copyDestination()
+    public long getProducerCapacity(AMQSession ssn) throws Exception
     {
-        AMQDestination dest = 
-            new AMQAnyDestination(_exchangeName,
-                                  _exchangeClass,
-                                  _routingKey,
-                                  _isExclusive, 
-                                  _isAutoDelete,
-                                  _queueName, 
-                                  _isDurable,
-                                  _bindingKeys
-                                  );
-        
-        dest.setDestSyntax(_destSyntax);
-        dest.setAddress(_address);
-        dest.setAddressName(_name);
-        dest.setSubject(_subject);
-        dest.setCreate(_create); 
-        dest.setAssert(_assert); 
-        dest.setDelete(_create); 
-        dest.setBrowseOnly(_browseOnly);
-        dest.setAddressType(_addressType);
-        dest.setTargetNode(_targetNode);
-        dest.setSourceNode(_sourceNode);
-        dest.setLink(_link);
-        dest.setAddressResolved(_isAddressResolved);
-        return dest;        
-    }
-    
-    protected void setAutoDelete(boolean b)
-    {
-        _isAutoDelete = b;
-    }
-    
-    protected void setDurable(boolean b)
-    {
-        _isDurable = b;
+        if (ssn.prefetch())
+        {
+            return ssn.getAMQConnection().getMaxPrefetch();
+        }
+        else
+        {
+            return 0;
+        }
     }
 }
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java b/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java
index 5bd1bd6..a1a5a3c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java
@@ -30,12 +30,6 @@
 
 public class AMQQueue extends AMQDestination implements Queue
 {
-
-    public AMQQueue(String address) throws URISyntaxException
-    {
-        super(address);
-    }
-    
     /**
      * Create a reference to a non temporary queue using a BindingURL object.
      * Note this does not actually imply the queue exists.
@@ -149,6 +143,12 @@
         super(exchangeName, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, routingKey, exclusive,
               autoDelete, queueName, durable, bindingKeys);
     }
+    
+    public AMQQueue(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, AMQShortString queueName, boolean exclusive, boolean autoDelete, boolean durable,AMQShortString[] bindingKeys)
+    {
+        super(exchangeName, exchangeClass, routingKey, exclusive,
+              autoDelete, queueName, durable, bindingKeys);
+    }
 
     public AMQShortString getRoutingKey()
     {
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index d34290e..c058ceb 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -98,6 +98,7 @@
 import org.apache.qpid.thread.Threading;
 import org.apache.qpid.transport.SessionException;
 import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.url.AMQBindingURL;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -976,7 +977,7 @@
     {
         checkValidDestination(destination);
 
-        return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic), null, null,
+        return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, isTopic(destination), null, null,
                                   isBrowseOnlyDestination(destination), false);
     }
 
@@ -984,7 +985,7 @@
     {
         checkValidDestination(destination);
 
-        return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic),
+        return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, isTopic(destination),
                                   messageSelector, null, isBrowseOnlyDestination(destination), false);
     }
 
@@ -993,7 +994,7 @@
     {
         checkValidDestination(destination);
 
-        return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, (destination instanceof Topic),
+        return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, isTopic(destination),
                                   messageSelector, null, isBrowseOnlyDestination(destination), false);
     }
 
@@ -1034,30 +1035,16 @@
         checkNotClosed();
         Topic origTopic = checkValidTopic(topic, true);
         
-        AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection);
-        if (dest.getDestSyntax() == DestSyntax.ADDR &&
-            !dest.isAddressResolved())
+        // The check valid Topic will throw an exception if topic is not an instanceof 
+        // AMQTopic or AddressBasedTopc.
+        Topic dest;
+        if (topic instanceof AMQTopic)
         {
-            try
-            {
-                handleAddressBasedDestination(dest,false,true);
-                if (dest.getAddressType() !=  AMQDestination.TOPIC_TYPE)
-                {
-                    throw new JMSException("Durable subscribers can only be created for Topics");
-                }
-                dest.getSourceNode().setDurable(true);
-            }
-            catch(AMQException e)
-            {
-                JMSException ex = new JMSException("Error when verifying destination");
-                ex.initCause(e);
-                ex.setLinkedException(e);
-                throw ex;
-            }
-            catch(TransportException e)
-            {
-                throw toJMSException("Error when verifying destination", e);
-            }
+            dest = AMQTopic.createDurableTopic(origTopic, name, _connection);
+        }
+        else
+        {
+            dest = AddressBasedTopic.createDurableTopic(origTopic, name, _connection, this);
         }
         
         String messageSelector = ((selector == null) || (selector.trim().length() == 0)) ? null : selector;
@@ -1071,7 +1058,7 @@
             if (subscriber == null)
             {
                 // After the address is resolved routing key will not be null.
-                AMQShortString topicName = dest.getRoutingKey();
+                AMQShortString topicName = new AMQShortString(dest.getTopicName());
                 
                 if (_strictAMQP)
                 {
@@ -1085,7 +1072,14 @@
                                         + "' for creation durableSubscriber. Requesting queue deletion regardless.");
                     }
 
-                    deleteQueue(dest.getAMQQueueName());
+                    if (topic instanceof AMQTopic)
+                    {
+                        deleteQueue(((AMQTopic)dest).getAMQQueueName());
+                    }
+                    else
+                    {
+                        ((AddressBasedTopic)topic).deleteSubscription(this);
+                    }
                 }
                 else
                 {
@@ -1099,13 +1093,34 @@
                     
                     // if the queue is bound to the exchange but NOT for this topic and selector, then the JMS spec
                     // says we must trash the subscription.
-                    boolean isQueueBound = isQueueBound(dest.getExchangeName(), dest.getAMQQueueName());
+                    AMQShortString exchangeName;
+                    AMQShortString queueName;
+                                        
+                    if (topic instanceof AMQTopic)
+                    {
+                        exchangeName = ((AMQTopic)dest).getExchangeName();
+                        queueName = ((AMQTopic)dest).getAMQQueueName();
+                                            
+                    }
+                    else
+                    {
+                        exchangeName = new AMQShortString(((AddressBasedTopic)topic).getAddress().getName());
+                        queueName = new AMQShortString(((AddressBasedTopic)topic).getAddress().getSubject());
+                    }                    
+                    boolean isQueueBound = isQueueBound(exchangeName, queueName);
                     boolean isQueueBoundForTopicAndSelector = 
-                                isQueueBound(dest.getExchangeName().asString(), dest.getAMQQueueName().asString(), topicName.asString(), args);
+                        isQueueBound(exchangeName.asString(), queueName.asString(), topicName.asString(), args);
 
                     if (isQueueBound && !isQueueBoundForTopicAndSelector)
                     {
-                        deleteQueue(dest.getAMQQueueName());
+                        if (topic instanceof AMQTopic)
+                        {
+                            deleteQueue(((AMQTopic)dest).getAMQQueueName());
+                        }
+                        else
+                        {
+                            ((AddressBasedTopic)topic).deleteSubscription(this);
+                        }
                     }
                 }
             }
@@ -1217,36 +1232,34 @@
     public Queue createQueue(String queueName) throws JMSException
     {
         checkNotClosed();
-        try
+        DestSyntax syntax = AMQDestination.getDestType(queueName);
+        queueName = AMQDestination.stripSyntaxPrefix(queueName);
+        if (syntax == AMQDestination.DestSyntax.BURL)
         {
-            if (queueName.indexOf('/') == -1 && queueName.indexOf(';') == -1)
+            if (queueName.indexOf('/') == -1)
             {
-                DestSyntax syntax = AMQDestination.getDestType(queueName);
-                if (syntax == AMQDestination.DestSyntax.BURL)
-                {
-                    // For testing we may want to use the prefix
-                    return new AMQQueue(getDefaultQueueExchangeName(), 
-                                        new AMQShortString(AMQDestination.stripSyntaxPrefix(queueName)));
-                }
-                else
-                {
-                    AMQQueue queue = new AMQQueue(queueName);
-                    return queue;
-                    
-                }
+                return new AMQQueue(getDefaultQueueExchangeName(), 
+                                   new AMQShortString(queueName));                
             }
             else
             {
-                return new AMQQueue(queueName);            
+                try
+                {
+                    return new AMQQueue(new AMQBindingURL(queueName));
+                }    
+                catch (URISyntaxException urlse)
+                {
+                    _logger.error("", urlse);
+                    JMSException jmse = new JMSException(urlse.getReason());
+                    jmse.setLinkedException(urlse);
+                    jmse.initCause(urlse);
+                    throw jmse;
+                }
             }
         }
-        catch (URISyntaxException urlse)
+        else
         {
-            _logger.error("", urlse);
-            JMSException jmse = new JMSException(urlse.getReason());
-            jmse.setLinkedException(urlse);
-            jmse.initCause(urlse);
-            throw jmse;
+            return new AddressBasedQueue(queueName);
         }
 
     }
@@ -1511,36 +1524,39 @@
     public Topic createTopic(String topicName) throws JMSException
     {
         checkNotClosed();
-        try
+        DestSyntax syntax = AMQDestination.getDestType(topicName);
+        topicName = AMQDestination.stripSyntaxPrefix(topicName);
+        if (syntax == AMQDestination.DestSyntax.BURL)
         {
-            if (topicName.indexOf('/') == -1 && topicName.indexOf(';') == -1)
+            if (topicName.indexOf('/') == -1)
             {
-                DestSyntax syntax = AMQDestination.getDestType(topicName);
-                // for testing we may want to use the prefix to indicate our choice.
-                topicName = AMQDestination.stripSyntaxPrefix(topicName);
-                if (syntax == AMQDestination.DestSyntax.BURL)
-                {
-                    return new AMQTopic(getDefaultTopicExchangeName(), new AMQShortString(topicName));
-                }
-                else
-                {
-                    return new AMQTopic("ADDR:" + getDefaultTopicExchangeName() + "/" + topicName);
-                }
+                return new AMQTopic(getDefaultTopicExchangeName(), new AMQShortString(topicName));
             }
             else
             {
-                return new AMQTopic(topicName);            
+                try
+                {
+                    return new AMQTopic(new AMQBindingURL(topicName));
+                }
+                catch (URISyntaxException urlse)
+                {
+                    _logger.error("", urlse);
+                    JMSException jmse = new JMSException(urlse.getReason());
+                    jmse.setLinkedException(urlse);
+                    jmse.initCause(urlse);
+                    throw jmse;
+                }
             }
-        
         }
-        catch (URISyntaxException urlse)
+        else
         {
-            _logger.error("", urlse);
-            JMSException jmse = new JMSException(urlse.getReason());
-            jmse.setLinkedException(urlse);
-            jmse.initCause(urlse);
-            throw jmse;
-        }
+            if (topicName.indexOf('/') == -1 && topicName.indexOf(';') == -1)
+            {
+                topicName = getDefaultTopicExchangeName() + "/" + topicName;
+            }
+            
+            return new AddressBasedTopic(topicName);
+        }        
     }
 
     public void declareExchange(AMQShortString name, AMQShortString type, boolean nowait) throws AMQException
@@ -2463,7 +2479,7 @@
                 ("Cannot create a durable subscription with a temporary topic: " + topic);
         }
 
-        if (!(topic instanceof AMQDestination && topic instanceof javax.jms.Topic))
+        if (!(topic instanceof AMQTopic || topic instanceof AddressBasedTopic))
         {
             throw new javax.jms.InvalidDestinationException(
                     "Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: "
@@ -2872,11 +2888,7 @@
 
         AMQProtocolHandler protocolHandler = getProtocolHandler();
 
-        if (amqd.getDestSyntax() == DestSyntax.ADDR)
-        {
-            handleAddressBasedDestination(amqd,true,nowait);            
-        }
-        else
+        if (amqd.getDestSyntax() == DestSyntax.BURL)
         {
             if (DECLARE_EXCHANGES)
             {
@@ -2932,10 +2944,6 @@
             throw new AMQException(null, "Fail-over exception interrupted basic consume.", e);
         }
     }
-
-    public abstract void handleAddressBasedDestination(AMQDestination dest, 
-                                                       boolean isConsumer,
-                                                       boolean noWait) throws AMQException;
     
     private void registerProducer(long producerId, MessageProducer producer)
     {
@@ -3565,4 +3573,16 @@
             _logger.debug("Rollback mark is set to " + _rollbackMark.get());
         }
     }
+    
+    private boolean isTopic(Destination dest)
+    {
+        if (dest instanceof AMQDestination)
+        {
+           return ((AMQDestination)dest).isTopic();
+        }
+        else
+        {
+           return dest instanceof Topic;
+        }
+    }
 }
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 3812e61..40c9113 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -23,11 +23,8 @@
 import static org.apache.qpid.transport.Option.UNRELIABLE;
 
 import java.lang.ref.WeakReference;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Timer;
 import java.util.TimerTask;
@@ -37,8 +34,6 @@
 import javax.jms.JMSException;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQDestination.AddressOption;
-import org.apache.qpid.client.AMQDestination.Binding;
 import org.apache.qpid.client.AMQDestination.DestSyntax;
 import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.client.failover.FailoverNoopSupport;
@@ -47,18 +42,15 @@
 import org.apache.qpid.client.message.FieldTableSupport;
 import org.apache.qpid.client.message.MessageFactoryRegistry;
 import org.apache.qpid.client.message.UnprocessedMessage_0_10;
-import org.apache.qpid.client.messaging.address.Link;
-import org.apache.qpid.client.messaging.address.Link.Reliability;
-import org.apache.qpid.client.messaging.address.Node.ExchangeNode;
-import org.apache.qpid.client.messaging.address.Node.QueueNode;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.messaging.address.AddressResolver;
+import org.apache.qpid.messaging.address.amqp_0_10.SubscriptionSettings_0_10;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.transport.ExchangeBoundResult;
 import org.apache.qpid.transport.ExchangeQueryResult;
-import org.apache.qpid.transport.ExecutionErrorCode;
 import org.apache.qpid.transport.ExecutionException;
 import org.apache.qpid.transport.MessageAcceptMode;
 import org.apache.qpid.transport.MessageAcquireMode;
@@ -143,7 +135,8 @@
      * USed to store the range of in tx messages
      */
     private RangeSet _txRangeSet = new RangeSet();
-    private int _txSize = 0;    
+    private int _txSize = 0;
+    private AddressResolver addressResolver;
     //--- constructors
 
     /**
@@ -345,31 +338,10 @@
         }
         else
         {
-            List<Binding> bindings = new ArrayList<Binding>();
-            bindings.addAll(destination.getSourceNode().getBindings());
-            bindings.addAll(destination.getTargetNode().getBindings());
-            
-            String defaultExchange = destination.getAddressType() == AMQDestination.TOPIC_TYPE ?
-                                     destination.getAddressName(): "amq.topic";
-            
-            for (Binding binding: bindings)
-            {
-                String queue = binding.getQueue() == null?
-                                   queueName.asString(): binding.getQueue();
-                                   
-               String exchange = binding.getExchange() == null ? 
-                                 defaultExchange :
-                                 binding.getExchange();
-                        
-                _logger.debug("Binding queue : " + queue + 
-                              " exchange: " + exchange + 
-                              " using binding key " + binding.getBindingKey() + 
-                              " with args " + printMap(binding.getArgs()));
-                getQpidSession().exchangeBind(queue, 
-                                              exchange,
-                                              binding.getBindingKey(),
-                                              binding.getArgs()); 
-            }
+            // do nothing atm
+            // when creating a producer or consumer the create/assert method should be invokved
+            // for consumers create and delete subscriptions should be called in the constructor and close()
+            // when closing them the delete method should be invoked
         }
         
         if (!nowait)
@@ -573,45 +545,51 @@
     {        
         boolean preAcquire;
         
-        long capacity = getCapacity(consumer.getDestination());
+        long capacity;
+        try 
+        {
+            capacity = consumer.getDestination().getConsumerCapacity(this);
+        } 
+        catch (Exception e1) 
+        {
+            AMQException ex = new AMQException(AMQConstant.INTERNAL_ERROR, "Error retrieving capacity",e1);
+            throw ex;
+        }
         
         try
         {
-            boolean isTopic;
+            boolean acceptModeNone = getAcknowledgeMode() == NO_ACKNOWLEDGE;
             Map<String, Object> arguments = FieldTable.convertToMap(consumer.getArguments());
             
             if (consumer.getDestination().getDestSyntax() == AMQDestination.DestSyntax.BURL)
             {
-                isTopic = consumer.getDestination() instanceof AMQTopic ||
+                boolean isTopic = consumer.getDestination() instanceof AMQTopic ||
                           consumer.getDestination().getExchangeClass().equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS) ;
                 
                 preAcquire = isTopic || (!consumer.isNoConsume()  && 
                         (consumer.getMessageSelector() == null || consumer.getMessageSelector().equals("")));
+                
+                getQpidSession().messageSubscribe(queueName.toString(), String.valueOf(tag),
+                                                 acceptModeNone ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT,
+                                                 preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED,
+                                                 null, 0, arguments,
+                                                 consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
             }
             else
             {
-                isTopic = consumer.getDestination().getAddressType() == AMQDestination.TOPIC_TYPE;
-                
+                AddressBasedDestination dest = (AddressBasedDestination)consumer.getDestination();
                 preAcquire = !consumer.isNoConsume() && 
-                             (isTopic || consumer.getMessageSelector() == null || 
+                             (dest.isTopic() || consumer.getMessageSelector() == null || 
                               consumer.getMessageSelector().equals(""));
                 
-                arguments.putAll(
-                        (Map<? extends String, ? extends Object>) consumer.getDestination().getLink().getSubscription().getArgs());
+                SubscriptionSettings_0_10 settings = new SubscriptionSettings_0_10();
+                settings.setAcceptMode(acceptModeNone ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT);
+                settings.setAccquireMode(preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED);
+                settings.setArgs(arguments);
+                settings.setMessageSelector(messageSelector);
+                settings.setSubscriptionTag(String.valueOf(tag));
+                dest.createSubscription(this,settings);
             }
-            
-            boolean acceptModeNone = getAcknowledgeMode() == NO_ACKNOWLEDGE;
-            
-            if (consumer.getDestination().getLink() != null)
-            {
-                acceptModeNone = consumer.getDestination().getLink().getReliability() == Link.Reliability.UNRELIABLE;
-            }
-            
-            getQpidSession().messageSubscribe
-                (queueName.toString(), String.valueOf(tag),
-                 acceptModeNone ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT,
-                 preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED, null, 0, arguments,
-                 consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
         }
         catch (JMSException e)
         {
@@ -646,21 +624,6 @@
         }
     }
 
-    private long getCapacity(AMQDestination destination)
-    {
-        long capacity = 0;
-        if (destination.getDestSyntax() == DestSyntax.ADDR && 
-                destination.getLink().getConsumerCapacity() > 0)
-        {
-            capacity = destination.getLink().getConsumerCapacity();
-        }
-        else if (prefetch())
-        {
-            capacity = getAMQConnection().getMaxPrefetch();
-        }
-        return capacity;
-    }
-
     /**
      * Create an 0_10 message producer
      */
@@ -775,12 +738,7 @@
         }
         else
         {
-            QueueNode node = (QueueNode)amqd.getSourceNode();
-            getQpidSession().queueDeclare(queueName.toString(), node.getAlternateExchange() ,
-                    node.getDeclareArgs(),
-                    node.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
-                    node.isDurable() ? Option.DURABLE : Option.NONE,
-                    node.isExclusive() ? Option.EXCLUSIVE : Option.NONE);   
+            // do nothing   
         }
 
         // passive --> false
@@ -825,7 +783,7 @@
                 //only set if msg list is null
                 try
                 {
-                    long capacity = getCapacity(consumer.getDestination());
+                    long capacity = consumer.getDestination().getConsumerCapacity(this);
                     
                     if (capacity == 0)
                     {
@@ -1056,317 +1014,6 @@
     {
         return AMQMessageDelegateFactory.FACTORY_0_10;
     }
-    
-    public boolean isExchangeExist(AMQDestination dest,ExchangeNode node,boolean assertNode)
-    {
-        boolean match = true;
-        ExchangeQueryResult result = getQpidSession().exchangeQuery(dest.getAddressName(), Option.NONE).get();
-        match = !result.getNotFound();        
-        
-        if (match)
-        {
-            if (assertNode)
-            {
-                match =  (result.getDurable() == node.isDurable()) && 
-                         (node.getExchangeType() != null && 
-                          node.getExchangeType().equals(result.getType())) &&
-                         (matchProps(result.getArguments(),node.getDeclareArgs()));
-            }            
-            else if (node.getExchangeType() != null)
-            {
-                // even if assert is false, better to verify this
-                match = node.getExchangeType().equals(result.getType());
-                if (!match)
-                {
-                    _logger.debug("Exchange type doesn't match. Expected : " +  node.getExchangeType() +
-                             " actual " + result.getType());
-                }
-            }
-            else
-            {
-                _logger.debug("Setting Exchange type " + result.getType());
-                node.setExchangeType(result.getType());
-                dest.setExchangeClass(new AMQShortString(result.getType()));
-            }
-        }
-        
-        return match;
-    }
-    
-    public boolean isQueueExist(AMQDestination dest,QueueNode node,boolean assertNode) throws AMQException
-    {
-        boolean match = true;
-        try
-        {
-            QueueQueryResult result = getQpidSession().queueQuery(dest.getAddressName(), Option.NONE).get();
-            match = dest.getAddressName().equals(result.getQueue());
-            
-            if (match && assertNode)
-            {
-                match = (result.getDurable() == node.isDurable()) && 
-                         (result.getAutoDelete() == node.isAutoDelete()) &&
-                         (result.getExclusive() == node.isExclusive()) &&
-                         (matchProps(result.getArguments(),node.getDeclareArgs()));
-            }
-            else if (match)
-            {
-                // should I use the queried details to update the local data structure.
-            }
-        }
-        catch(SessionException e)
-        {
-            if (e.getException().getErrorCode() == ExecutionErrorCode.RESOURCE_DELETED)
-            {
-                match = false;
-            }
-            else
-            {
-                throw new AMQException(AMQConstant.getConstant(e.getException().getErrorCode().getValue()),
-                        "Error querying queue",e);
-            }
-        }
-        
-        return match;
-    }
-    
-    private boolean matchProps(Map<String,Object> target,Map<String,Object> source)
-    {
-        boolean match = true;
-        for (String key: source.keySet())
-        {
-            match = target.containsKey(key) && 
-                    target.get(key).equals(source.get(key));
-            
-            if (!match) 
-            { 
-                StringBuffer buf = new StringBuffer();
-                buf.append("Property given in address did not match with the args sent by the broker.");
-                buf.append(" Expected { ").append(key).append(" : ").append(source.get(key)).append(" }, ");
-                buf.append(" Actual { ").append(key).append(" : ").append(target.get(key)).append(" }");
-                _logger.debug(buf.toString());
-                return match;
-            }
-        }
-        
-        return match;
-    }
-
-    /**
-     * 1. Try to resolve the address type (queue or exchange)
-     * 2. if type == queue, 
-     *       2.1 verify queue exists or create if create == true
-     *       2.2 If not throw exception
-     *       
-     * 3. if type == exchange,
-     *       3.1 verify exchange exists or create if create == true
-     *       3.2 if not throw exception
-     *       3.3 if exchange exists (or created) create subscription queue.
-     */
-    
-    @SuppressWarnings("deprecation")
-    public void handleAddressBasedDestination(AMQDestination dest, 
-                                              boolean isConsumer,
-                                              boolean noWait) throws AMQException
-    {
-        if (dest.isAddressResolved())
-        {           
-            if (isConsumer && AMQDestination.TOPIC_TYPE == dest.getAddressType()) 
-            {
-                createSubscriptionQueue(dest);
-            }
-        }
-        else
-        {
-            boolean assertNode = (dest.getAssert() == AddressOption.ALWAYS) || 
-                                 (isConsumer && dest.getAssert() == AddressOption.RECEIVER) ||
-                                 (!isConsumer && dest.getAssert() == AddressOption.SENDER);
-            
-            boolean createNode = (dest.getCreate() == AddressOption.ALWAYS) ||
-                                 (isConsumer && dest.getCreate() == AddressOption.RECEIVER) ||
-                                 (!isConsumer && dest.getCreate() == AddressOption.SENDER);
-                        
-            
-            
-            int type = resolveAddressType(dest);
-            
-            if (type == AMQDestination.QUEUE_TYPE && 
-                    dest.getLink().getReliability() == Reliability.UNSPECIFIED)
-            {
-                dest.getLink().setReliability(Reliability.AT_LEAST_ONCE);
-            }
-            else if (type == AMQDestination.TOPIC_TYPE && 
-                    dest.getLink().getReliability() == Reliability.UNSPECIFIED)
-            {
-                dest.getLink().setReliability(Reliability.UNRELIABLE);
-            }
-            else if (type == AMQDestination.TOPIC_TYPE && 
-                    dest.getLink().getReliability() == Reliability.AT_LEAST_ONCE)
-            {
-                throw new AMQException("AT-LEAST-ONCE is not yet supported for Topics");                      
-            }
-            
-            switch (type)
-            {
-                case AMQDestination.QUEUE_TYPE: 
-                {
-                    if (isQueueExist(dest,(QueueNode)dest.getSourceNode(),assertNode))
-                    {
-                        setLegacyFiledsForQueueType(dest);
-                        break;
-                    }
-                    else if(createNode)
-                    {
-                        setLegacyFiledsForQueueType(dest);
-                        send0_10QueueDeclare(dest,null,false,noWait);
-                        sendQueueBind(dest.getAMQQueueName(), dest.getRoutingKey(),
-                                      null,dest.getExchangeName(),dest, false);
-                        break;
-                    }                
-                }
-                
-                case AMQDestination.TOPIC_TYPE: 
-                {
-                    if (isExchangeExist(dest,(ExchangeNode)dest.getTargetNode(),assertNode))
-                    {                    
-                        setLegacyFiledsForTopicType(dest);
-                        verifySubject(dest);
-                        if (isConsumer && !isQueueExist(dest,(QueueNode)dest.getSourceNode(),true)) 
-                        {  
-                            createSubscriptionQueue(dest);
-                        }
-                        break;
-                    }
-                    else if(createNode)
-                    {                    
-                        setLegacyFiledsForTopicType(dest);
-                        verifySubject(dest);
-                        sendExchangeDeclare(dest.getAddressName(), 
-                                dest.getExchangeClass().asString(),
-                                dest.getTargetNode().getAlternateExchange(),
-                                dest.getTargetNode().getDeclareArgs(),
-                                false);        
-                        if (isConsumer && !isQueueExist(dest,(QueueNode)dest.getSourceNode(),true)) 
-                        {
-                            createSubscriptionQueue(dest);
-                        }
-                        break;
-                    }
-                }
-                
-                default:
-                    throw new AMQException(
-                            "The name '" + dest.getAddressName() +
-                            "' supplied in the address doesn't resolve to an exchange or a queue");
-            }
-            dest.setAddressResolved(true);
-        }
-    }
-    
-    public int resolveAddressType(AMQDestination dest) throws AMQException
-    {
-       int type = dest.getAddressType();
-       String name = dest.getAddressName();
-       if (type != AMQDestination.UNKNOWN_TYPE)
-       {
-           return type;
-       }
-       else
-       {
-            ExchangeBoundResult result = getQpidSession().exchangeBound(name,name,null,null).get();
-            if (result.getQueueNotFound() && result.getExchangeNotFound()) {
-                //neither a queue nor an exchange exists with that name; treat it as a queue
-                type = AMQDestination.QUEUE_TYPE;
-            } else if (result.getExchangeNotFound()) {
-                //name refers to a queue
-                type = AMQDestination.QUEUE_TYPE;
-            } else if (result.getQueueNotFound()) {
-                //name refers to an exchange
-                type = AMQDestination.TOPIC_TYPE;
-            } else {
-                //both a queue and exchange exist for that name
-                throw new AMQException("Ambiguous address, please specify queue or topic as node type");
-            }
-            dest.setAddressType(type);
-            dest.rebuildTargetAndSourceNodes(type);
-            return type;
-        }        
-    }
-    
-    private void verifySubject(AMQDestination dest) throws AMQException
-    {
-        if (dest.getSubject() == null || dest.getSubject().trim().equals(""))
-        {
-            
-            if ("topic".equals(dest.getExchangeClass().toString()))
-            {
-                dest.setRoutingKey(new AMQShortString("#"));
-                dest.setSubject(dest.getRoutingKey().toString());
-            }
-            else
-            {
-                dest.setRoutingKey(new AMQShortString(""));
-                dest.setSubject("");
-            }
-        }
-    }
-    
-    private void createSubscriptionQueue(AMQDestination dest) throws AMQException
-    {
-        QueueNode node = (QueueNode)dest.getSourceNode();  // source node is never null
-        
-        if (dest.getQueueName() == null)
-        {
-            if (dest.getLink() != null && dest.getLink().getName() != null) 
-            {
-                dest.setQueueName(new AMQShortString(dest.getLink().getName())); 
-            }
-        }
-        node.setExclusive(true);
-        node.setAutoDelete(!node.isDurable());
-        send0_10QueueDeclare(dest,null,false,true);
-        node.addBinding(new Binding(dest.getAddressName(),
-                                    dest.getQueueName(),// should have one by now
-                                    dest.getSubject(),
-                                    Collections.<String,Object>emptyMap()));
-        sendQueueBind(dest.getAMQQueueName(), dest.getRoutingKey(),
-                null,dest.getExchangeName(),dest, false);
-    }
-    
-    public void setLegacyFiledsForQueueType(AMQDestination dest)
-    {
-        // legacy support
-        dest.setQueueName(new AMQShortString(dest.getAddressName()));
-        dest.setExchangeName(new AMQShortString(""));
-        dest.setExchangeClass(new AMQShortString(""));
-        dest.setRoutingKey(dest.getAMQQueueName());
-    }
-
-    public void setLegacyFiledsForTopicType(AMQDestination dest)
-    {
-        // legacy support
-        dest.setExchangeName(new AMQShortString(dest.getAddressName()));
-        ExchangeNode node = (ExchangeNode)dest.getTargetNode();
-        dest.setExchangeClass(node.getExchangeType() == null? 
-                              ExchangeDefaults.TOPIC_EXCHANGE_CLASS:
-                              new AMQShortString(node.getExchangeType()));  
-        dest.setRoutingKey(new AMQShortString(dest.getSubject()));
-    }
-    
-    /** This should be moved to a suitable utility class */
-    private String printMap(Map<String,Object> map)
-    {
-        StringBuilder sb = new StringBuilder();
-        sb.append("<");
-        if (map != null)
-        {
-            for(String key : map.keySet())
-            {
-                sb.append(key).append(" = ").append(map.get(key)).append(" ");
-            }
-        }
-        sb.append(">");
-        return sb.toString();
-    }
 
     protected void acknowledgeImpl()
     {
@@ -1389,4 +1036,16 @@
         _highestDeliveryTag.set(-1);
         super.resubscribe();
     }
+    
+    public boolean isQueueExist(String queue)
+    {
+        QueueQueryResult result = _qpidSession.queueQuery(queue, Option.NONE).get();
+        return queue.equals(result.getQueue());        
+    }
+    
+    public boolean isExchangeExist(String exchange)
+    {
+        ExchangeQueryResult result = _qpidSession.exchangeQuery(exchange, Option.NONE).get();
+        return !result.getNotFound();        
+    }    
 }
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index 369c8a6..5c59939 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -590,14 +590,6 @@
     {    
         declareExchange(new AMQShortString("amq.direct"), new AMQShortString("direct"), false);
     }
-
-    public void handleAddressBasedDestination(AMQDestination dest, 
-                                              boolean isConsumer,
-                                              boolean noWait) throws AMQException
-    {
-        throw new UnsupportedOperationException("The new addressing based sytanx is "
-                + "not supported for AMQP 0-8/0-9 versions");
-    }
     
     protected void flushAcknowledgments()
     {
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
index 780dbca..54736b0 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
@@ -33,16 +33,7 @@
 
 public class AMQTopic extends AMQDestination implements Topic
 {
-    public AMQTopic(String address) throws URISyntaxException
-    {
-        super(address);
-    }
-
-    public AMQTopic(Address address) throws Exception
-    {
-        super(address);
-    }
-    
+   
     /**
      * Constructor for use in creating a topic using a BindingURL.
      *
@@ -102,37 +93,8 @@
         if (topic instanceof AMQDestination && topic instanceof javax.jms.Topic)
         {
             AMQDestination qpidTopic = (AMQDestination)topic;
-            if (qpidTopic.getDestSyntax() == DestSyntax.ADDR)
-            {
-                try
-                {
-                    AMQTopic t = new AMQTopic(qpidTopic.getAddress());
-                    AMQShortString queueName = getDurableTopicQueueName(subscriptionName, connection);
-                    // link is never null if dest was created using an address string.
-                    t.getLink().setName(queueName.asString());               
-                    t.getSourceNode().setAutoDelete(false);
-                    t.getSourceNode().setDurable(true);
-                    
-                    // The legacy fields are also populated just in case.
-                    t.setQueueName(queueName);
-                    t.setAutoDelete(false);
-                    t.setDurable(true);
-                    return t;
-                }
-                catch(Exception e)
-                {
-                    JMSException ex = new JMSException("Error creating durable topic");
-                    ex.initCause(e);
-                    ex.setLinkedException(e);
-                    throw ex;
-                }
-            }
-            else
-            {
                 return new AMQTopic(qpidTopic.getExchangeName(), qpidTopic.getRoutingKey(), false,
-                                getDurableTopicQueueName(subscriptionName, connection),
-                                true);
-            }
+                                    getDurableTopicQueueName(subscriptionName, connection),true);
         }
         else
         {
@@ -151,10 +113,6 @@
         {
             return getRoutingKey().asString();
         }
-        else if (getSubject() != null)
-        {
-            return getSubject();
-        }
         else
         {
             return null;
@@ -164,14 +122,7 @@
     @Override
     public AMQShortString getExchangeName()
     {
-        if (super.getExchangeName() == null && super.getAddressName() != null)
-        {
-            return new AMQShortString(super.getAddressName());
-        }
-        else
-        {
-            return _exchangeName;
-        }
+        return _exchangeName;
     }
 
     public AMQShortString getRoutingKey()
@@ -180,15 +131,9 @@
         {
             return super.getRoutingKey();            
         }
-        else if (getSubject() != null)
-        {
-            return new AMQShortString(getSubject());
-        }
         else
         {
-            setRoutingKey(new AMQShortString(""));
-            setSubject("");
-            return super.getRoutingKey();
+            return null;
         }
     }
 
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 5fba351..7721722 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -17,27 +17,36 @@
  */
 package org.apache.qpid.client;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.qpid.client.AMQDestination.AddressOption;
-import org.apache.qpid.client.AMQDestination.DestSyntax;
-import org.apache.qpid.client.failover.FailoverException;
-import org.apache.qpid.client.message.*;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQInternalException;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.transport.*;
-import org.apache.qpid.filter.MessageFilter;
-import org.apache.qpid.filter.JMSSelectorFilter;
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageListener;
-import java.util.Iterator;
-import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInternalException;
+import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.message.AMQMessageDelegateFactory;
+import org.apache.qpid.client.message.AMQMessageDelegate_0_10;
+import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.MessageFactoryRegistry;
+import org.apache.qpid.client.message.UnprocessedMessage_0_10;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.filter.JMSSelectorFilter;
+import org.apache.qpid.filter.MessageFilter;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.messaging.QpidDestination.CheckMode;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.transport.Acquired;
+import org.apache.qpid.transport.MessageCreditUnit;
+import org.apache.qpid.transport.Option;
+import org.apache.qpid.transport.RangeSet;
+import org.apache.qpid.transport.SessionException;
+import org.apache.qpid.transport.TransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This is a 0.10 message consumer.
@@ -83,6 +92,16 @@
         super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, protocolHandler,
                 arguments, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, noConsume, autoClose);
         _0_10session = (AMQSession_0_10) session;
+        
+        if (AMQDestination.DestSyntax.ADDR == destination.getDestSyntax())
+        {
+            AddressBasedDestination addrDest = (AddressBasedDestination)destination;
+            addrDest.resolveAddress((AMQSession_0_10)session);
+            addrDest.create((AMQSession_0_10)session,CheckMode.FOR_RECEIVER); 
+            addrDest.azzert((AMQSession_0_10)session,CheckMode.FOR_RECEIVER);
+            // ideally we should be invoking addrDest.createSubscription() here;
+        }
+        
         if (messageSelector != null && !messageSelector.equals(""))
         {
             try
@@ -93,33 +112,24 @@
             {
                 throw new InvalidSelectorException("cannot create consumer because of selector issue");
             }
+            
             if (destination instanceof AMQQueue)
             {
                 _preAcquire = false;
             }
         }
         
-        // Destination setting overrides connection defaults
-        if (destination.getDestSyntax() == DestSyntax.ADDR && 
-                destination.getLink().getConsumerCapacity() > 0)
+        try
         {
-            capacity = destination.getLink().getConsumerCapacity();
+            capacity = destination.getConsumerCapacity(session);
         }
-        else if (getSession().prefetch())
+        catch(Exception e)
         {
-            capacity = _0_10session.getAMQConnection().getMaxPrefetch();
-        }
-
-        if (destination.isAddressResolved() && AMQDestination.TOPIC_TYPE == destination.getAddressType()) 
-        {            
-            boolean namedQueue = destination.getLink() != null && destination.getLink().getName() != null ; 
-            
-            if (!namedQueue)
-            {
-                _destination = destination.copyDestination();
-                _destination.setQueueName(null);
-            }
-        }
+            JMSException ex = new JMSException("Error retrieving capacity");
+            ex.initCause(e);
+            ex.setLinkedException(e);
+            throw ex;
+        }        
     }
 
 
@@ -473,22 +483,7 @@
     
     public boolean isExclusive()
     {
-        AMQDestination dest = this.getDestination();
-        if (dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
-        {
-            if (dest.getAddressType() == AMQDestination.TOPIC_TYPE)
-            {
-                return true;
-            }
-            else
-            {                
-                return dest.getLink().getSubscription().isExclusive();
-            }
-        }
-        else
-        {
-            return _exclusive;
-        }
+        return _exclusive;
     }
     
     void cleanupQueue() throws AMQException, FailoverException
@@ -496,11 +491,14 @@
         AMQDestination dest = this.getDestination();
         if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
         {
-            if (dest.getDelete() == AddressOption.ALWAYS ||
-                dest.getDelete() == AddressOption.RECEIVER )
+            try
             {
-                ((AMQSession_0_10) getSession()).getQpidSession().queueDelete(
-                        this.getDestination().getQueueName());
+                ((AddressBasedDestination)dest).delete((AMQSession_0_10)getSession(),CheckMode.FOR_RECEIVER);
+            }
+            catch(Exception e)
+            {
+                AMQException ex = new AMQException(AMQConstant.INTERNAL_ERROR,"Error deleting queue",e);
+                throw ex;
             }
         }
     }
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
index 57f64c2..d2a88bc 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
@@ -31,13 +31,13 @@
 import javax.jms.Message;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQDestination.AddressOption;
 import org.apache.qpid.client.AMQDestination.DestSyntax;
 import org.apache.qpid.client.message.AMQMessageDelegate_0_10;
 import org.apache.qpid.client.message.AbstractJMSMessage;
 import org.apache.qpid.client.message.QpidMessageProperties;
-import org.apache.qpid.client.messaging.address.Link.Reliability;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.messaging.QpidDestination.CheckMode;
+import org.apache.qpid.messaging.address.Link.Reliability;
 import org.apache.qpid.transport.DeliveryProperties;
 import org.apache.qpid.transport.Header;
 import org.apache.qpid.transport.MessageAcceptMode;
@@ -86,7 +86,10 @@
         {       
             try
             {
-                getSession().handleAddressBasedDestination(destination,false,false);
+                AddressBasedDestination addrDest = (AddressBasedDestination)destination;
+                addrDest.resolveAddress((AMQSession_0_10)getSession());
+                addrDest.create((AMQSession_0_10)getSession(),CheckMode.FOR_SENDER); 
+                addrDest.azzert((AMQSession_0_10)getSession(),CheckMode.FOR_SENDER);
             }
             catch(Exception e)
             {
@@ -165,11 +168,13 @@
             deliveryProp.setPriority(MessageDeliveryPriority.get((short) priority));
             message.setJMSPriority(priority);
         }
+        
         String exchangeName = destination.getExchangeName() == null ? "" : destination.getExchangeName().toString();
         if ( deliveryProp.getExchange() == null || ! deliveryProp.getExchange().equals(exchangeName))
         {
             deliveryProp.setExchange(exchangeName);
         }
+        
         String routingKey = destination.getRoutingKey().toString();
         if (deliveryProp.getRoutingKey() == null || ! deliveryProp.getRoutingKey().equals(routingKey))
         {
@@ -177,7 +182,7 @@
         }
         
         if (destination.getDestSyntax() == AMQDestination.DestSyntax.ADDR && 
-           (destination.getSubject() != null || 
+           (((AddressBasedDestination)destination).getAddress().getSubject() != null || 
               (messageProps.getApplicationHeaders() != null && messageProps.getApplicationHeaders().get(QpidMessageProperties.QPID_SUBJECT) != null))
            )
         {
@@ -191,10 +196,11 @@
             if (appProps.get(QpidMessageProperties.QPID_SUBJECT) == null)
             {
                 // use default subject in address string
-                appProps.put(QpidMessageProperties.QPID_SUBJECT,destination.getSubject());
+                appProps.put(QpidMessageProperties.QPID_SUBJECT,
+                        ((AddressBasedDestination)destination).getAddress().getSubject());
             }
                     
-            if (destination.getAddressType() == AMQDestination.TOPIC_TYPE)
+            if (((AddressBasedDestination)destination).isTopic())
             {
                 deliveryProp.setRoutingKey((String)
                         messageProps.getApplicationHeaders().get(QpidMessageProperties.QPID_SUBJECT));                
@@ -218,8 +224,8 @@
                          deliveryMode == DeliveryMode.PERSISTENT)
                    );  
             
-            boolean unreliable = (destination.getDestSyntax() == DestSyntax.ADDR) &&
-                                 (destination.getLink().getReliability() == Reliability.UNRELIABLE);
+            boolean unreliable = false; //(destination.getDestSyntax() == DestSyntax.ADDR) &&
+            //                     (destination.getLink().getReliability() == Reliability.UNRELIABLE);
             
 
             ByteBuffer buffer = data == null ? ByteBuffer.allocate(0) : data.slice();
@@ -258,19 +264,15 @@
         AMQDestination dest = _destination;
         if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
         {
-            if (dest.getDelete() == AddressOption.ALWAYS ||
-                dest.getDelete() == AddressOption.SENDER )
+            try
             {
-                try
-                {
-                    ((AMQSession_0_10) getSession()).getQpidSession().queueDelete(
-                        _destination.getQueueName());
-                }
-                catch(TransportException e)
-                {
-                    throw getSession().toJMSException("Exception while closing producer:" + e.getMessage(), e);
-                }
+                ((AddressBasedDestination)dest).delete((AMQSession_0_10)getSession(),CheckMode.FOR_SENDER);
             }
+            catch(TransportException e)
+            {
+                throw getSession().toJMSException("Exception while closing producer:" + e.getMessage(), e);
+            }
+
         }
     }
 
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
index f360b54..c6c36b9 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
@@ -42,6 +42,7 @@
 import org.apache.qpid.AMQPInvalidClassException;
 import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.client.AMQSession_0_10;
+import org.apache.qpid.client.AddressBasedDestination;
 import org.apache.qpid.client.CustomJMSXProperty;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.jms.Message;
@@ -271,13 +272,14 @@
     private Destination convertToAddressBasedDestination(String exchange, String routingKey, String subject)
     {
         String addr;
-        if ("".equals(exchange)) // type Queue
+        if ("".equals(exchange)) // type Queue and the routing key is the Queue name.
         {
             subject = (subject == null) ? "" : "/" + subject;
             addr = routingKey + subject;
         }
         else
         {
+            // routing key is the subject here.
             addr = exchange + "/" + routingKey;
         }
         
@@ -315,36 +317,14 @@
 
         if (amqd.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
         {
-           try
-           {
-               int type = ((AMQSession_0_10)getAMQSession()).resolveAddressType(amqd);
-               if (type == AMQDestination.QUEUE_TYPE)
-               {
-                   ((AMQSession_0_10)getAMQSession()).setLegacyFiledsForQueueType(amqd);
-               }
-               else
-               {
-                   ((AMQSession_0_10)getAMQSession()).setLegacyFiledsForTopicType(amqd);
-               }
-           }
-           catch(AMQException ex)
-           {
-               JMSException e = new JMSException("Error occured while figuring out the node type");
-               e.initCause(ex);
-               e.setLinkedException(ex);
-               throw e;
-           }
-           catch (TransportException e)
-           {
-               JMSException jmse = new JMSException("Exception occured while figuring out the node type:" + e.getMessage());
-               jmse.initCause(e);
-               jmse.setLinkedException(e);
-               throw jmse;
-           }
-
+            AddressBasedDestination dest = (AddressBasedDestination)amqd;
+            dest.resolveAddress((AMQSession_0_10)_session);
         }
         
-        final ReplyTo replyTo = new ReplyTo(amqd.getExchangeName().toString(), amqd.getRoutingKey().toString());
+        String exchangeName = amqd.getExchangeName().asString();
+        String routingKey = amqd.getRoutingKey().asString();
+        
+        final ReplyTo replyTo = new ReplyTo(exchangeName, routingKey);
         _destinationCache.put(replyTo, new SoftReference<Destination>(destination));
         _messageProps.setReplyTo(replyTo);
 
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java
index 1b6c0c7..424e7d7 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java
@@ -26,7 +26,6 @@
 import javax.jms.JMSException;
 import javax.jms.Session;
 
-import org.apache.qpid.client.AMQAnyDestination;
 import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.client.AMQQueue;
 import org.apache.qpid.client.AMQSession;
@@ -85,7 +84,7 @@
     }
 
     /** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */
-    private AMQSession<?,?> _session;
+    protected AMQSession<?,?> _session;
     private final long _deliveryTag;
 
     protected AbstractAMQMessageDelegate(long deliveryTag)
@@ -132,14 +131,17 @@
         }
         else
         {
-            dest = new AMQAnyDestination(exchange,
-                                         new AMQShortString(exchangeInfo.exchangeType),
-                                         routingKey,
-                                         false,
-                                         false,
-                                         routingKey,
-                                         false,
-                                         new AMQShortString[] {routingKey});
+            // This is to cater to fanout, match and nameless exchange types.
+            // This method is only used if the syntax is BURL.
+            // See AMQMessageDelegate_0_10.java for more details.
+            dest = new AMQQueue(exchange,
+                                new AMQShortString(exchangeInfo.exchangeType),
+                                routingKey,
+                                routingKey,
+                                false,
+                                false,
+                                false,
+                                new AMQShortString[] {routingKey});
         }
 
         return dest;
diff --git a/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java b/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
deleted file mode 100644
index 368ec60..0000000
--- a/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
+++ /dev/null
@@ -1,332 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client.messaging.address;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQDestination.Binding;
-import org.apache.qpid.client.messaging.address.Link.Reliability;
-import org.apache.qpid.client.messaging.address.Link.Subscription;
-import org.apache.qpid.client.messaging.address.Node.ExchangeNode;
-import org.apache.qpid.client.messaging.address.Node.QueueNode;
-import org.apache.qpid.client.messaging.address.Node.UnknownNodeType;
-import org.apache.qpid.configuration.Accessor;
-import org.apache.qpid.configuration.Accessor.MapAccessor;
-import org.apache.qpid.messaging.Address;
-
-/**
- * Utility class for extracting information from the address class
- */
-public class AddressHelper
-{
-    public static final String NODE = "node";
-    public static final String LINK = "link";
-    public static final String X_DECLARE = "x-declare";
-    public static final String X_BINDINGS = "x-bindings";
-    public static final String X_SUBSCRIBE = "x-subscribes";
-    public static final String CREATE = "create";
-    public static final String ASSERT = "assert";
-    public static final String DELETE = "delete";
-    public static final String FILTER = "filter";
-    public static final String NO_LOCAL = "no-local";
-    public static final String DURABLE = "durable";
-    public static final String EXCLUSIVE = "exclusive";
-    public static final String AUTO_DELETE = "auto-delete";
-    public static final String TYPE = "type";
-    public static final String ALT_EXCHANGE = "alternate-exchange";
-    public static final String BINDINGS = "bindings";
-    public static final String BROWSE = "browse";
-    public static final String MODE = "mode";
-    public static final String CAPACITY = "capacity";
-    public static final String CAPACITY_SOURCE = "source";
-    public static final String CAPACITY_TARGET = "target";
-    public static final String NAME = "name";
-    public static final String EXCHANGE = "exchange";
-    public static final String QUEUE = "queue";
-    public static final String KEY = "key";
-    public static final String ARGUMENTS = "arguments";
-    public static final String RELIABILITY = "reliability";
-
-    private Address address;
-    private Accessor addressProps;
-    private Accessor nodeProps;
-    private Accessor linkProps;
-
-    public AddressHelper(Address address)
-    {
-        this.address = address;
-        addressProps = new MapAccessor(address.getOptions());
-        Map node_props = address.getOptions() == null
-                || address.getOptions().get(NODE) == null ? null
-                : (Map) address.getOptions().get(NODE);
-
-        if (node_props != null)
-        {
-            nodeProps = new MapAccessor(node_props);
-        }
-
-        Map link_props = address.getOptions() == null
-                || address.getOptions().get(LINK) == null ? null
-                : (Map) address.getOptions().get(LINK);
-
-        if (link_props != null)
-        {
-            linkProps = new MapAccessor(link_props);
-        }
-    }
-
-    public String getCreate()
-    {
-        return addressProps.getString(CREATE);
-    }
-
-    public String getAssert()
-    {
-        return addressProps.getString(ASSERT);
-    }
-
-    public String getDelete()
-    {
-        return addressProps.getString(DELETE);
-    }
-
-    public boolean isNoLocal()
-    {
-        Boolean b = nodeProps.getBoolean(NO_LOCAL);
-        return b == null ? false : b;
-    }
-
-    public boolean isBrowseOnly()
-    {
-        String mode = addressProps.getString(MODE);
-        return mode != null && mode.equals(BROWSE) ? true : false;
-    }
-
-    @SuppressWarnings("unchecked")
-    public List<Binding> getBindings(Map props)
-    {
-        List<Binding> bindings = new ArrayList<Binding>();
-        List<Map> bindingList = (List<Map>) props.get(X_BINDINGS);
-        if (bindingList != null)
-        {
-            for (Map bindingMap : bindingList)
-            {
-                Binding binding = new Binding(
-                        (String) bindingMap.get(EXCHANGE),
-                        (String) bindingMap.get(QUEUE),
-                        (String) bindingMap.get(KEY),
-                        bindingMap.get(ARGUMENTS) == null ? Collections.EMPTY_MAP
-                                : (Map<String, Object>) bindingMap
-                                        .get(ARGUMENTS));
-                bindings.add(binding);
-            }
-        }
-        return bindings;
-    }
-
-    public Map getDeclareArgs(Map props)
-    {
-        if (props != null && props.get(X_DECLARE) != null)
-        {
-            return (Map) props.get(X_DECLARE);
-            
-        } else
-        {
-            return Collections.EMPTY_MAP;
-        }
-    }
-
-    public int getTargetNodeType() throws Exception
-    {
-        if (nodeProps == null || nodeProps.getString(TYPE) == null)
-        {
-            // need to query and figure out
-            return AMQDestination.UNKNOWN_TYPE;
-        } else if (nodeProps.getString(TYPE).equals("queue"))
-        {
-            return AMQDestination.QUEUE_TYPE;
-        } else if (nodeProps.getString(TYPE).equals("topic"))
-        {
-            return AMQDestination.TOPIC_TYPE;
-        } else
-        {
-            throw new Exception("unkown exchange type");
-        }
-    }
-
-    public Node getTargetNode(int addressType)
-    {
-        // target node here is the default exchange
-        if (nodeProps == null || addressType == AMQDestination.QUEUE_TYPE)
-        {
-            return new ExchangeNode();
-        } else if (addressType == AMQDestination.TOPIC_TYPE)
-        {
-            Map node = (Map) address.getOptions().get(NODE);
-            return createExchangeNode(node);
-        } else
-        {
-            // don't know yet
-            return null;
-        }
-    }
-
-    private Node createExchangeNode(Map parent)
-    {
-        Map declareArgs = getDeclareArgs(parent);
-        MapAccessor argsMap = new MapAccessor(declareArgs);
-        ExchangeNode node = new ExchangeNode();
-        node.setExchangeType(argsMap.getString(TYPE) == null ? null : argsMap
-                .getString(TYPE));
-        fillInCommonNodeArgs(node, parent, argsMap);
-        return node;
-    }
-
-    private Node createQueueNode(Map parent)
-    {
-        Map declareArgs = getDeclareArgs(parent);
-        MapAccessor argsMap = new MapAccessor(declareArgs);
-        QueueNode node = new QueueNode();
-        node.setAlternateExchange(argsMap.getString(ALT_EXCHANGE));
-        node.setExclusive(argsMap.getBoolean(EXCLUSIVE) == null ? false
-                : argsMap.getBoolean(EXCLUSIVE));
-        fillInCommonNodeArgs(node, parent, argsMap);
-
-        return node;
-    }
-
-    private void fillInCommonNodeArgs(Node node, Map parent, MapAccessor argsMap)
-    {
-        node.setDurable(getDurability(parent));
-        node.setAutoDelete(argsMap.getBoolean(AUTO_DELETE) == null ? false
-                : argsMap.getBoolean(AUTO_DELETE));
-        node.setAlternateExchange(argsMap.getString(ALT_EXCHANGE));
-        node.setBindings(getBindings(parent));
-        if (getDeclareArgs(parent).containsKey(ARGUMENTS))
-        {
-            node.setDeclareArgs((Map<String,Object>)getDeclareArgs(parent).get(ARGUMENTS));
-        }
-    }
-    
-    private boolean getDurability(Map map)
-    {
-        Accessor access = new MapAccessor(map);
-        Boolean result = access.getBoolean(DURABLE);
-        return (result == null) ? false : result.booleanValue();
-    }
-
-    /**
-     * if the type == queue x-declare args from the node props is used. if the
-     * type == exchange x-declare args from the link props is used else just
-     * create a default temp queue.
-     */
-    public Node getSourceNode(int addressType)
-    {
-        if (addressType == AMQDestination.QUEUE_TYPE && nodeProps != null)
-        {
-            return createQueueNode((Map) address.getOptions().get(NODE));
-        }
-        if (addressType == AMQDestination.TOPIC_TYPE && linkProps != null)
-        {
-            return createQueueNode((Map) address.getOptions().get(LINK));
-        } else
-        {
-            // need to query the info
-            return new QueueNode();
-        }
-    }
-
-    public Link getLink() throws Exception
-    {
-        Link link = new Link();
-        link.setSubscription(new Subscription());
-        if (linkProps != null)
-        {
-            link.setDurable(linkProps.getBoolean(DURABLE) == null ? false
-                    : linkProps.getBoolean(DURABLE));
-            link.setName(linkProps.getString(NAME));
-
-            String reliability = linkProps.getString(RELIABILITY);
-            if ( reliability != null)
-            {
-                if (reliability.equalsIgnoreCase("unreliable"))
-                {
-                    link.setReliability(Reliability.UNRELIABLE);
-                }
-                else if (reliability.equalsIgnoreCase("at-least-once"))
-                {
-                    link.setReliability(Reliability.AT_LEAST_ONCE);
-                }
-                else
-                {
-                    throw new Exception("The reliability mode '" + 
-                            reliability + "' is not yet supported");
-                }
-                
-            }
-            
-            if (((Map) address.getOptions().get(LINK)).get(CAPACITY) instanceof Map)
-            {
-                MapAccessor capacityProps = new MapAccessor(
-                        (Map) ((Map) address.getOptions().get(LINK))
-                                .get(CAPACITY));
-                link
-                        .setConsumerCapacity(capacityProps
-                                .getInt(CAPACITY_SOURCE) == null ? 0
-                                : capacityProps.getInt(CAPACITY_SOURCE));
-                link
-                        .setProducerCapacity(capacityProps
-                                .getInt(CAPACITY_TARGET) == null ? 0
-                                : capacityProps.getInt(CAPACITY_TARGET));
-            } 
-            else
-            {
-                int cap = linkProps.getInt(CAPACITY) == null ? 0 : linkProps
-                        .getInt(CAPACITY);
-                link.setConsumerCapacity(cap);
-                link.setProducerCapacity(cap);
-            }
-            link.setFilter(linkProps.getString(FILTER));
-            // so far filter type not used
-            
-            if (((Map) address.getOptions().get(LINK)).containsKey(X_SUBSCRIBE))
-            {   
-                Map x_subscribe = (Map)((Map) address.getOptions().get(LINK)).get(X_SUBSCRIBE);
-                
-                if (x_subscribe.containsKey(ARGUMENTS))
-                {
-                    link.getSubscription().setArgs((Map<String,Object>)x_subscribe.get(ARGUMENTS));
-                }
-                
-                boolean exclusive = x_subscribe.containsKey(EXCLUSIVE) ?
-                                    Boolean.parseBoolean((String)x_subscribe.get(EXCLUSIVE)): false;
-                
-                link.getSubscription().setExclusive(exclusive);
-            }
-        }
-
-        return link;
-    }
-}
diff --git a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
deleted file mode 100644
index 5f97d62..0000000
--- a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client.messaging.address;
-
-import static org.apache.qpid.client.messaging.address.Link.Reliability.UNSPECIFIED;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.qpid.client.messaging.address.Node.QueueNode;
-
-public class Link
-{ 
-    public enum FilterType { SQL92, XQUERY, SUBJECT }
-    
-    public enum Reliability { UNRELIABLE, AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE, UNSPECIFIED }
-    
-    protected String name;
-    protected String _filter;
-    protected FilterType _filterType = FilterType.SUBJECT;
-    protected boolean _isNoLocal;
-    protected boolean _isDurable;
-    protected int _consumerCapacity = 0;
-    protected int _producerCapacity = 0;
-    protected Node node;
-    protected Subscription subscription;
-    protected Reliability reliability = UNSPECIFIED;
-    
-    public Reliability getReliability()
-    {
-        return reliability;
-    }
-
-    public void setReliability(Reliability reliability)
-    {
-        this.reliability = reliability;
-    }
-
-    public Node getNode()
-    {
-        return node;
-    }
-
-    public void setNode(Node node)
-    {
-        this.node = node;
-    }
-
-    public boolean isDurable()
-    {
-        return _isDurable;
-    }
-    
-    public void setDurable(boolean durable)
-    {
-        _isDurable = durable;
-    }
-     
-    public String getFilter()
-    {
-        return _filter;
-    }
-
-    public void setFilter(String filter)
-    {
-        this._filter = filter;
-    }
-
-    public FilterType getFilterType()
-    {
-        return _filterType;
-    }
-
-    public void setFilterType(FilterType type)
-    {
-        _filterType = type;
-    }
-
-    public boolean isNoLocal()
-    {
-        return _isNoLocal;
-    }
-
-    public void setNoLocal(boolean noLocal)
-    {
-        _isNoLocal = noLocal;
-    }
-
-    public int getConsumerCapacity()
-    {
-        return _consumerCapacity;
-    }
-
-    public void setConsumerCapacity(int capacity)
-    {
-        _consumerCapacity = capacity;
-    }
-    
-    public int getProducerCapacity()
-    {
-        return _producerCapacity;
-    }
-
-    public void setProducerCapacity(int capacity)
-    {
-        _producerCapacity = capacity;
-    }
-    
-    public String getName()
-    {
-        return name;
-    }
-
-    public void setName(String name)
-    {
-        this.name = name;
-    }
-    
-    public Subscription getSubscription()
-    {
-        return this.subscription;
-    }    
- 
-    public void setSubscription(Subscription subscription)
-    {
-        this.subscription = subscription;
-    }   
-    
-    public static class Subscription
-    {
-        private Map<String,Object> args = new HashMap<String,Object>();        
-        private boolean exclusive = false;
-        
-        public Map<String, Object> getArgs()
-        {
-            return args;
-        }
-        
-        public void setArgs(Map<String, Object> args)
-        {
-            this.args = args;
-        }
-        
-        public boolean isExclusive()
-        {
-            return exclusive;
-        }
-        
-        public void setExclusive(boolean exclusive)
-        {
-            this.exclusive = exclusive;
-        }
-    }
-}
diff --git a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java
deleted file mode 100644
index c98b194..0000000
--- a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.client.messaging.address;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import javax.naming.OperationNotSupportedException;
-
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQDestination.Binding;
-
-public abstract class Node
-{ 
-    protected int _nodeType = AMQDestination.UNKNOWN_TYPE;
-    protected boolean _isDurable;
-    protected boolean _isAutoDelete;
-    protected String _alternateExchange;
-    protected List<Binding> _bindings = new ArrayList<Binding>();
-    protected Map<String,Object> _declareArgs = Collections.emptyMap();
-    
-    public int getType()
-    {
-        return _nodeType;
-    }
-    
-    public boolean isDurable()
-    {
-        return _isDurable;
-    }
-    
-    public void setDurable(boolean durable)
-    {
-        _isDurable = durable;
-    }
-
-    public boolean isAutoDelete()
-    {
-        return _isAutoDelete;
-    }
-    
-    public void setAutoDelete(boolean autoDelete)
-    {
-        _isAutoDelete = autoDelete;
-    }
-    
-    public String getAlternateExchange()
-    {
-        return _alternateExchange;
-    }
-    
-    public void setAlternateExchange(String altExchange)
-    {
-        _alternateExchange = altExchange;
-    }
-    
-    public List<Binding> getBindings()
-    {
-        return _bindings;
-    }
-    
-    public void setBindings(List<Binding> bindings)
-    {
-        _bindings = bindings;
-    }
-    
-    public void addBinding(Binding binding) {
-        this._bindings.add(binding);
-    }
-    
-    public Map<String,Object> getDeclareArgs()
-    {
-        return _declareArgs;
-    }
-    
-    public void setDeclareArgs(Map<String,Object> options)
-    {
-        _declareArgs = options;
-    }   
-    
-    public static class QueueNode extends Node 
-    {
-       protected boolean _isExclusive;      
-       protected QpidQueueOptions _queueOptions = new QpidQueueOptions();
-       
-       public QueueNode()
-       {
-           _nodeType = AMQDestination.QUEUE_TYPE;
-       }
-       
-       public boolean isExclusive()
-       {
-           return _isExclusive;
-       }
-       
-       public void setExclusive(boolean exclusive)
-       {
-           _isExclusive = exclusive;
-       }  
-    }
-    
-    public static class ExchangeNode extends Node 
-    {
-       protected QpidExchangeOptions _exchangeOptions = new QpidExchangeOptions();
-       protected String _exchangeType;
-       
-       public ExchangeNode()
-       {
-           _nodeType = AMQDestination.TOPIC_TYPE;
-       }
-       
-       public String getExchangeType()
-       {
-           return _exchangeType;
-       }
-       
-       public void setExchangeType(String exchangeType)
-       {
-           _exchangeType = exchangeType;
-       }
-    
-    }
-    
-    public static class UnknownNodeType extends Node 
-    {
-    }
-}
diff --git a/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java b/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java
index cb3ab71..6a22113 100644
--- a/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java
+++ b/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java
@@ -30,8 +30,8 @@
 import javax.jms.MessageListener;
 import javax.jms.Session;
 
-import org.apache.qpid.client.AMQAnyDestination;
 import org.apache.qpid.client.AMQBrokerDetails;
+import org.apache.qpid.client.AMQQueue;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.jms.BrokerDetails;
 import org.apache.qpid.jms.Connection;
@@ -97,11 +97,12 @@
         {
             _ssn = _conn.createSession(false,Session.AUTO_ACKNOWLEDGE);
             MessageConsumer cons = _ssn.createConsumer(
-                                        new AMQAnyDestination(new AMQShortString("amq.failover"),
-                                                              new AMQShortString("amq.failover"),
-                                                              new AMQShortString(""),
-                                                              true,true,null,false,
-                                                              new AMQShortString[0])); 
+                                        new AMQQueue(new AMQShortString("amq.failover"),
+                                                     new AMQShortString("amq.failover"),
+                                                     new AMQShortString(""),
+                                                     new AMQShortString(""),
+                                                     true,true,false,
+                                                     new AMQShortString[0])); 
             cons.setMessageListener(this);
         }                               
     }
diff --git a/java/client/src/main/java/org/apache/qpid/messaging/QpidDestination.java b/java/client/src/main/java/org/apache/qpid/messaging/QpidDestination.java
new file mode 100644
index 0000000..d021ed9
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/messaging/QpidDestination.java
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.messaging;
+
+public interface QpidDestination 
+{
+    public enum CheckMode {FOR_RECEIVER, FOR_SENDER};
+    
+    public void checkCreate(Session ssn,CheckMode mode) throws Exception;
+
+    public void checkAssert(Session ssn,CheckMode mode) throws Exception;
+    
+    public void checkDelete(Session ssn,CheckMode mode) throws Exception;    
+    
+    public void createSubscription(Session ssn,SubscriptionSettings settings) throws Exception;
+    
+    public void deleteSubscription(Session ssn) throws Exception;
+    
+    public String getSubscriptionQueue() throws Exception;
+    
+    public long getConsumerCapacity() throws Exception;
+    
+    public long getProducerCapacity() throws Exception;
+}
diff --git a/java/client/src/main/java/org/apache/qpid/messaging/QpidQueue.java b/java/client/src/main/java/org/apache/qpid/messaging/QpidQueue.java
new file mode 100644
index 0000000..6bc1846
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/messaging/QpidQueue.java
@@ -0,0 +1,31 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.messaging;
+
+public abstract class QpidQueue implements QpidDestination
+{
+    protected String queueName;
+    
+    public String getQueueName()
+    {
+        return queueName;
+    }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/messaging/QpidTopic.java b/java/client/src/main/java/org/apache/qpid/messaging/QpidTopic.java
new file mode 100644
index 0000000..05728c1
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/messaging/QpidTopic.java
@@ -0,0 +1,31 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.messaging;
+
+public abstract class QpidTopic implements QpidDestination 
+{
+    protected String topicName;
+    
+    public String getTopicName()
+    {
+        return topicName;
+    }    
+}
diff --git a/java/client/src/main/java/org/apache/qpid/messaging/Session.java b/java/client/src/main/java/org/apache/qpid/messaging/Session.java
new file mode 100644
index 0000000..bc3d745
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/messaging/Session.java
@@ -0,0 +1,31 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.messaging;
+
+/**
+ * Currently only a marker interface used primarily 
+ * for address destination refactoring QPID-3401
+ *
+ */
+public interface Session 
+{
+
+}
diff --git a/java/client/src/main/java/org/apache/qpid/messaging/SubscriptionSettings.java b/java/client/src/main/java/org/apache/qpid/messaging/SubscriptionSettings.java
new file mode 100644
index 0000000..e90d25e
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/messaging/SubscriptionSettings.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.messaging;
+
+public interface SubscriptionSettings 
+{
+
+}
diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/AddressException.java b/java/client/src/main/java/org/apache/qpid/messaging/address/AddressException.java
new file mode 100644
index 0000000..c1b2000
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/messaging/address/AddressException.java
@@ -0,0 +1,14 @@
+package org.apache.qpid.messaging.address;
+
+public class AddressException extends Exception 
+{
+    public AddressException(String message)
+    {
+        super(message);
+    }
+    
+    public AddressException(String message, Throwable cause)
+    {
+        super(message,cause);
+    }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/AddressHelper.java b/java/client/src/main/java/org/apache/qpid/messaging/address/AddressHelper.java
new file mode 100644
index 0000000..7f5b1f9
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/messaging/address/AddressHelper.java
@@ -0,0 +1,157 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.messaging.address;
+
+import static org.apache.qpid.messaging.address.AddressProperty.*;
+
+import org.apache.qpid.configuration.Accessor;
+import org.apache.qpid.configuration.Accessor.NestedMapAccessor;
+import org.apache.qpid.messaging.Address;
+import org.apache.qpid.messaging.Address.PolicyType;
+import org.apache.qpid.messaging.Address.AddressType;
+import org.apache.qpid.messaging.QpidDestination.CheckMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AddressHelper 
+{
+    private static final Logger _logger = LoggerFactory.getLogger(AddressHelper.class);
+    
+    protected Address address;
+    protected Accessor addressProps;
+
+    public AddressHelper(Address address)
+    {
+        this.address = address;
+        addressProps = new NestedMapAccessor(address.getOptions());
+    }
+
+    public PolicyType getCreate()
+    {
+        return PolicyType.getPolicyType(addressProps.getString(CREATE));
+    }
+
+    public PolicyType getAssert()
+    {
+        return PolicyType.getPolicyType(addressProps.getString(ASSERT));
+    }
+
+    public PolicyType getDelete()
+    {
+        return PolicyType.getPolicyType(addressProps.getString(DELETE));
+    }
+    
+    public boolean isBrowseOnly()
+    {
+        String mode = addressProps.getString(MODE);
+        return mode != null && mode.equals(BROWSE) ? true : false;
+    }
+    
+    public AddressType getNodeType()
+    {
+        String type = addressProps.getString(getFQN(NODE,TYPE));
+        if ("topic".equalsIgnoreCase(type))
+        {
+           return AddressType.TOPIC_ADDRESS;
+        }
+        else if ("queue".equalsIgnoreCase(type))
+        {
+            return AddressType.QUEUE_ADDRESS;
+        }
+        else
+        {
+            return AddressType.UNSPECIFIED;
+        }
+    }
+    
+    public boolean isNodeDurable()
+    {
+         Boolean b = addressProps.getBoolean(getFQN(NODE,DURABLE));
+         return b == null ? false : b.booleanValue();
+    }
+    
+    public String getLinkName()
+    {
+        return addressProps.getString(getFQN(LINK,NAME));
+    }
+    
+    public boolean isLinkDurable()
+    {
+         Boolean b = addressProps.getBoolean(getFQN(LINK,DURABLE));
+         return b == null ? false : b.booleanValue();
+    }
+    
+    public String getLinkReliability()
+    {
+        return addressProps.getString(getFQN(LINK,RELIABILITY));
+    }
+    
+    public int getLinkProducerCapacity()
+    {
+        return getCapacity(CheckMode.FOR_SENDER);
+    }
+    
+    public int getLinkConsumerCapacity()
+    {
+        return getCapacity(CheckMode.FOR_RECEIVER);
+    }
+    
+    private int getCapacity(CheckMode  mode)
+    {
+        int capacity = 0;
+        try
+        {
+            capacity = addressProps.getInt(getFQN(LINK,CAPACITY));            
+        }
+        catch(Exception e)
+        {
+            try
+            {
+                if (mode == CheckMode.FOR_RECEIVER)
+                {
+                    capacity = addressProps.getInt(getFQN(LINK,CAPACITY,CAPACITY_SOURCE));
+                }
+                else
+                {
+                    capacity = addressProps.getInt(getFQN(LINK,CAPACITY,CAPACITY_TARGET));
+                }
+            }
+            catch(Exception ex)
+            {
+                if (ex instanceof NumberFormatException && !ex.getMessage().equals("null"))
+                {
+                    _logger.info("Unable to retrieve capacity from address: " + address,ex);
+                }
+            }
+        }
+        
+        return capacity;
+    }
+    
+    public static boolean isAllowed(PolicyType policy, CheckMode mode)
+    {
+        return (policy == PolicyType.ALWAYS ||
+                (policy == PolicyType.RECEIVER && mode == CheckMode.FOR_RECEIVER) ||
+                (policy == PolicyType.SENDER && mode == CheckMode.FOR_SENDER)
+               );
+        
+    }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/AddressProperty.java b/java/client/src/main/java/org/apache/qpid/messaging/address/AddressProperty.java
new file mode 100644
index 0000000..26b0f2e
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/messaging/address/AddressProperty.java
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.messaging.address;
+
+public final class AddressProperty 
+{
+    public static final String NODE = "node";
+    public static final String LINK = "link";
+    public static final String CREATE = "create";
+    public static final String ASSERT = "assert";
+    public static final String DELETE = "delete";
+    public static final String FILTER = "filter";
+    public static final String NO_LOCAL = "no-local";
+    public static final String DURABLE = "durable";
+    public static final String TYPE = "type";
+    public static final String BROWSE = "browse";
+    public static final String MODE = "mode";
+    public static final String CAPACITY = "capacity";
+    public static final String CAPACITY_SOURCE = "source";
+    public static final String CAPACITY_TARGET = "target";
+    public static final String NAME = "name";    
+    public static final String RELIABILITY = "reliability";    
+    
+    public static String getFQN(String... propNames)
+    {
+        StringBuilder sb = new StringBuilder();
+        for(String prop: propNames)
+        {
+            sb.append(prop).append("/");
+        }
+        return sb.substring(0, sb.length() -1);
+    }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/AddressResolver.java b/java/client/src/main/java/org/apache/qpid/messaging/address/AddressResolver.java
new file mode 100644
index 0000000..d7cf231
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/messaging/address/AddressResolver.java
@@ -0,0 +1,31 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.messaging.address;
+
+import org.apache.qpid.messaging.Address;
+import org.apache.qpid.messaging.Address.AddressType;
+import org.apache.qpid.messaging.Address.PolicyType;
+import org.apache.qpid.messaging.QpidDestination;
+public interface AddressResolver 
+{
+    public QpidDestination resolve(Address address) throws AddressException;
+    
+}
diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/Link.java b/java/client/src/main/java/org/apache/qpid/messaging/address/Link.java
new file mode 100644
index 0000000..55eb863
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/messaging/address/Link.java
@@ -0,0 +1,112 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.messaging.address;
+
+import static org.apache.qpid.messaging.address.Link.Reliability.UNSPECIFIED;
+
+public class Link 
+{
+    public enum FilterType { SQL92, XQUERY, SUBJECT }
+        
+    public enum Reliability 
+    { 
+        UNRELIABLE, AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE, UNSPECIFIED; 
+        
+        public static Reliability getReliability(String reliability) throws AddressException
+        {
+            if (reliability == null)
+            {
+                return UNSPECIFIED;
+            }
+            else if (reliability.equalsIgnoreCase("unreliable"))
+            {
+                return UNRELIABLE;
+            }
+            else if (reliability.equalsIgnoreCase("at-least-once"))
+            {
+                return AT_LEAST_ONCE;
+            }
+            else
+            {
+                throw new AddressException("The reliability mode '" + 
+                        reliability + "' is not yet supported");
+            }
+        }
+    }
+    
+    protected String name;
+    protected String filter;
+    protected FilterType filterType = FilterType.SUBJECT;
+    protected boolean noLocal;
+    protected boolean durable;
+    protected int consumerCapacity = 0;
+    protected int producerCapacity = 0;
+    protected Reliability reliability = UNSPECIFIED;
+    
+    public Link(AddressHelper helper) throws AddressException
+    {
+        name = helper.getLinkName();
+        durable = helper.isLinkDurable();
+        reliability = Reliability.getReliability(helper.getLinkReliability());
+        consumerCapacity = helper.getLinkConsumerCapacity();
+        producerCapacity = helper.getLinkProducerCapacity();
+    }
+    
+    public Reliability getReliability()
+    {
+        return reliability;
+    }
+
+    public boolean isDurable()
+    {
+        return durable;
+    }
+     
+    public String getFilter()
+    {
+        return filter;
+    }
+
+    public FilterType getFilterType()
+    {
+        return filterType;
+    }
+
+    public boolean isNoLocal()
+    {
+        return noLocal;
+    }
+
+    public int getConsumerCapacity()
+    {
+        return consumerCapacity;
+    }
+   
+    public int getProducerCapacity()
+    {
+        return producerCapacity;
+    }
+    
+    public String getName()
+    {
+        return name;
+    }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/Node.java b/java/client/src/main/java/org/apache/qpid/messaging/address/Node.java
new file mode 100644
index 0000000..29c65f0
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/messaging/address/Node.java
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.messaging.address;
+
+import org.apache.qpid.messaging.Address.PolicyType;
+
+public class Node 
+{
+    boolean durable = false;
+    PolicyType createPolicy = PolicyType.NEVER;
+    PolicyType assertPolicy = PolicyType.NEVER;
+    PolicyType deletePolicy = PolicyType.NEVER;
+    
+    public Node(AddressHelper helper)
+    {
+        durable = helper.isNodeDurable();
+        createPolicy = helper.getCreate();
+        assertPolicy = helper.getAssert();
+        deletePolicy = helper.getDelete();
+    }
+    
+    public boolean isDurable()
+    {
+        return durable;
+    }
+    
+    public PolicyType getCreatePolicy()
+    {
+        return createPolicy;
+    }
+    
+    public PolicyType getAssertPolicy()
+    {
+        return assertPolicy;
+    }
+    
+    public PolicyType getDeletePolicy()
+    {
+        return deletePolicy;
+    }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/AddressHelper_0_10.java b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/AddressHelper_0_10.java
new file mode 100644
index 0000000..243b236
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/AddressHelper_0_10.java
@@ -0,0 +1,141 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.messaging.address.amqp_0_10;
+
+import static org.apache.qpid.messaging.address.AddressProperty.*;
+import static org.apache.qpid.messaging.address.amqp_0_10.AddressProperty_0_10.*;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.qpid.configuration.Accessor;
+import org.apache.qpid.configuration.Accessor.MapAccessor;
+import org.apache.qpid.messaging.Address;
+import org.apache.qpid.messaging.address.AddressHelper;
+
+
+public class AddressHelper_0_10 extends AddressHelper 
+{
+    protected Accessor declareProps;
+    
+    public AddressHelper_0_10(Address address) 
+    {
+        super(address);
+    }
+    
+    // Node properties --------------------
+    
+    public boolean isNodeExclusive()
+    {
+        Boolean b = addressProps.getBoolean(getFQN(NODE,X_DECLARE,EXCLUSIVE));
+        return b == null ? false : b.booleanValue();
+    }
+    
+    public boolean isNodeAutoDelete()
+    {
+        Boolean b = addressProps.getBoolean(getFQN(NODE,X_DECLARE,AUTO_DELETE));
+        return b == null ? false : b.booleanValue();
+    }
+    
+    public String getNodeAltExchange()
+    {
+       return addressProps.getString(getFQN(NODE,X_DECLARE,ALT_EXCHANGE));
+    }
+
+    public Map getNodeDeclareArgs()
+    {
+        Map map = addressProps.getMap(getFQN(NODE,X_DECLARE,ARGUMENTS));
+        return map == null ? Collections.EMPTY_MAP : map;
+    }
+    
+    public String getNodeExchangeType() 
+    {
+        String type = addressProps.getString(getFQN(NODE,X_DECLARE,EXCHANGE_TYPE));
+        return type == null ? "topic" : type;
+    }
+    
+    public List<Binding> getNodeBindings()
+    {
+       return getBindings(addressProps.getList(getFQN(NODE,X_BINDINGS)));
+    }
+
+    // Link properties --------------------
+    
+    public List<Binding> getLinkQueueBindings()
+    {
+       return getBindings(addressProps.getList(getFQN(LINK,X_BINDINGS)));
+    }
+    
+    public Map getLinkQueueDeclareArgs()
+    {
+        Map map = addressProps.getMap(getFQN(LINK,X_DECLARE,ARGUMENTS));
+        return map == null ? Collections.EMPTY_MAP : map;
+    }   
+    
+    public Boolean isLinkQueueExclusive()
+    {
+        return addressProps.getBoolean(getFQN(LINK,X_DECLARE,EXCLUSIVE));
+    }
+    
+    public Boolean isLinkQueueAutoDelete()
+    {
+        return addressProps.getBoolean(getFQN(LINK,X_DECLARE,AUTO_DELETE));
+    }
+    
+    public String getLinkQueueAltExchange()
+    {
+       return addressProps.getString(getFQN(LINK,X_DECLARE,ALT_EXCHANGE));
+    }
+
+    public Boolean isSubscriptionExclusive()
+    {
+       return addressProps.getBoolean(getFQN(LINK,X_SUBSCRIBE,EXCLUSIVE));
+    }
+    
+    public Map getSubscriptionArguments()
+    {
+       Map m = addressProps.getMap(getFQN(LINK,X_SUBSCRIBE,ARGUMENTS));
+       return m == null ? Collections.EMPTY_MAP : m;
+    }
+    
+    @SuppressWarnings("unchecked")
+    private List<Binding> getBindings(List<Map> bindingList)
+    {
+        List<Binding> bindings = new ArrayList<Binding>();
+        if (bindingList != null)
+        {
+            for (Map map : bindingList)
+            {
+                MapAccessor bindingMap = new MapAccessor(map);
+                Binding binding = new Binding(
+                        bindingMap.getString(EXCHANGE),
+                        bindingMap.getString(QUEUE),
+                        bindingMap.getString(KEY),
+                        bindingMap.getMap(ARGUMENTS) == null ? Collections.EMPTY_MAP
+                                : bindingMap.getMap(ARGUMENTS));
+                bindings.add(binding);
+            }
+        }
+        return bindings;
+    }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/AddressProperty_0_10.java b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/AddressProperty_0_10.java
new file mode 100644
index 0000000..8d46c29
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/AddressProperty_0_10.java
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.messaging.address.amqp_0_10;
+
+public final class AddressProperty_0_10
+{
+    public static final String X_DECLARE = "x-declare";
+    public static final String X_BINDINGS = "x-bindings";
+    public static final String X_SUBSCRIBE = "x-subscribes";
+    
+    public static final String EXCLUSIVE = "exclusive";
+    public static final String AUTO_DELETE = "auto-delete";
+    public static final String ALT_EXCHANGE = "alternate-exchange";
+    public static final String EXCHANGE_TYPE = "type";
+    
+    public static final String BINDINGS = "bindings";    
+    public static final String EXCHANGE = "exchange";
+    public static final String QUEUE = "queue";
+    public static final String KEY = "key";
+    public static final String ARGUMENTS = "arguments";    
+}
diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/AddressResolver_0_10.java b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/AddressResolver_0_10.java
new file mode 100644
index 0000000..acd5fbb
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/AddressResolver_0_10.java
@@ -0,0 +1,108 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.messaging.address.amqp_0_10;
+
+import org.apache.qpid.messaging.Address;
+import org.apache.qpid.messaging.Address.AddressType;
+import org.apache.qpid.messaging.Address.PolicyType;
+import org.apache.qpid.messaging.QpidDestination;
+import org.apache.qpid.messaging.address.AddressException;
+import org.apache.qpid.messaging.address.AddressResolver;
+import org.apache.qpid.transport.ExchangeBoundResult;
+import org.apache.qpid.transport.Session;
+
+public class AddressResolver_0_10 implements AddressResolver 
+{
+    Session ssn;
+    
+    public AddressResolver_0_10(Session session)
+    {
+        this.ssn = session;
+    }
+
+    /**
+     * 1. Check if it's an exchange (topic) or a queue
+     * 2. Create a QpidTopic or a QpidQueue based on that 
+     */
+    public QpidDestination resolve(Address address) throws AddressException
+    {
+        AddressHelper_0_10 helper = new AddressHelper_0_10(address);
+        
+        if (!address.isResolved())
+        {
+            checkAddressType(address,helper);
+            address.setResolved(true);
+        }
+        
+        Link_0_10 link = new Link_0_10(helper);
+        Node_0_10 node = (address.getAddressType() == AddressType.TOPIC_ADDRESS) ?
+                         new ExchangeNode(helper) : new QueueNode(helper);
+             
+        try
+        {
+            QpidDestination dest = (QpidDestination) ((address.getAddressType() == AddressType.TOPIC_ADDRESS) ?
+                            new QpidTopic_0_10(address,(ExchangeNode)node,link):
+                            new QpidQueue_0_10(address,(QueueNode)node,link));
+            
+            return dest;
+        }
+        catch(Exception e)
+        {
+            throw new AddressException("Error creating destination impl",e);
+        }        
+    }
+    
+    private void checkAddressType(Address address,AddressHelper_0_10 helper) throws AddressException 
+    {
+        ExchangeBoundResult result = ssn.exchangeBound(address.getName(),address.getName(),
+                                                      null,null).get();
+        if (result.getQueueNotFound() && result.getExchangeNotFound()) {
+            //neither a queue nor an exchange exists with that name; 
+            //treat it as a queue unless a type is specified.
+            if (helper.getNodeType() == AddressType.UNSPECIFIED)
+            {
+                address.setAddressType(AddressType.QUEUE_ADDRESS);
+            }
+            else
+            {
+                address.setAddressType(AddressType.TOPIC_ADDRESS);
+            }
+        } 
+        else if (result.getExchangeNotFound()) 
+        {
+            //name refers to a queue
+            address.setAddressType(AddressType.QUEUE_ADDRESS);
+        }
+        else if (result.getQueueNotFound()) 
+        {
+            //name refers to an exchange
+            address.setAddressType(AddressType.TOPIC_ADDRESS);
+        } 
+        else 
+        {
+            //both a queue and exchange exist for that name
+            if (helper.getNodeType() == AddressType.UNSPECIFIED)
+            {
+                throw new AddressException("Ambiguous address, please specify a node type. Ex type:{queue|topic}");
+            }
+        }
+    }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/Binding.java b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/Binding.java
new file mode 100644
index 0000000..46c789a
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/Binding.java
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.messaging.address.amqp_0_10;
+
+import java.util.Map;
+
+public class Binding
+{
+    String exchange;
+    String bindingKey;
+    String queue;
+    Map<String,Object> args;
+    
+    public Binding(String exchange,
+                   String queue,
+                   String bindingKey,
+                   Map<String,Object> args)
+    {
+        this.exchange = exchange;
+        this.queue = queue;
+        this.bindingKey = bindingKey;
+        this.args = args;
+    }
+    
+    public String getExchange() 
+    {
+        return exchange;
+    }
+
+    public String getQueue() 
+    {
+        return queue;
+    }
+    
+    public String getBindingKey() 
+    {
+        return bindingKey;
+    }
+    
+    public Map<String, Object> getArgs() 
+    {
+        return args;
+    }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/ExchangeNode.java b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/ExchangeNode.java
new file mode 100644
index 0000000..633fd3e
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/ExchangeNode.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.messaging.address.amqp_0_10;
+
+
+public class ExchangeNode extends Node_0_10 
+{
+    private String exchangeType;
+    
+    public ExchangeNode(AddressHelper_0_10 helper)
+    {
+        super(helper);
+        exchangeType = helper.getNodeExchangeType();
+    }
+    
+    public String getExchangeType() 
+    {
+        return exchangeType;
+    }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/Link_0_10.java b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/Link_0_10.java
new file mode 100644
index 0000000..c8df601
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/Link_0_10.java
@@ -0,0 +1,110 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.messaging.address.amqp_0_10;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.qpid.messaging.address.AddressException;
+import org.apache.qpid.messaging.address.Link;
+
+public class Link_0_10 extends Link 
+{
+    private Subscription subscription;
+    private List<Binding> bindings;
+    
+    private Boolean autoDelete = null;    
+    private String altExchange;
+    private Boolean exclusive = null;
+    private Map<String,Object> declareArgs;    
+    
+    @SuppressWarnings("unchecked")
+    public Link_0_10(AddressHelper_0_10 helper) throws AddressException
+    {
+        super(helper);
+        autoDelete = helper.isLinkQueueAutoDelete();
+        exclusive = helper.isLinkQueueExclusive();
+        altExchange = helper.getLinkQueueAltExchange();
+        declareArgs = helper.getLinkQueueDeclareArgs();
+        bindings = helper.getLinkQueueBindings();
+        subscription = new Subscription();
+        subscription.setExclusive(helper.isSubscriptionExclusive());
+        subscription.setArgs(helper.getSubscriptionArguments());
+    }
+    
+    public List<Binding> getQueueBindings() 
+    {
+        return bindings;
+    }
+
+    public Boolean isQueueAutoDelete() 
+    {
+        return autoDelete;
+    }
+
+    public String getQueueAltExchange() 
+    {
+        return altExchange;
+    }
+
+    public Boolean isQueueExclusive() 
+    {
+        return exclusive;
+    }
+
+    public Map<String, Object> getQueueDeclareArgs() 
+    {
+        return declareArgs;
+    }
+
+    
+    public Subscription getSubscription()
+    {
+        return this.subscription;
+    }    
+    
+    public static class Subscription
+    {
+        private Map<String,Object> args = new HashMap<String,Object>();        
+        private Boolean exclusive = null;
+        
+        public Map<String, Object> getArgs()
+        {
+            return args;
+        }
+        
+        public void setArgs(Map<String, Object> args)
+        {
+            this.args = args;
+        }
+        
+        public Boolean isExclusive()
+        {
+            return exclusive;
+        }
+        
+        public void setExclusive(Boolean exclusive)
+        {
+            this.exclusive = exclusive;
+        }
+    }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/Node_0_10.java b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/Node_0_10.java
new file mode 100644
index 0000000..f97b8c1
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/Node_0_10.java
@@ -0,0 +1,82 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.messaging.address.amqp_0_10;
+
+import java.util.Map;
+
+import org.apache.qpid.messaging.address.Node;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Node_0_10 extends Node 
+{
+    private static final Logger _logger = LoggerFactory.getLogger(Node_0_10.class);
+    
+    private boolean autoDelete = false;    
+    private String altExchange;
+    private Map<String,Object> declareArgs;
+             
+    public Node_0_10(AddressHelper_0_10 helper)
+    {
+        super(helper);
+        declareArgs = helper.getNodeDeclareArgs();  
+        autoDelete = helper.isNodeAutoDelete();
+        altExchange = helper.getNodeAltExchange();
+    }
+    
+    public boolean isAutoDelete() 
+    {
+        return autoDelete;
+    }
+
+    public String getAltExchange() 
+    {
+        return altExchange;
+    }
+
+    public Map<String, Object> getDeclareArgs() 
+    {
+        return declareArgs;
+    }
+
+    public boolean matchProps(Map<String,Object> target)
+    {
+        boolean match = true;
+        Map<String,Object> source = declareArgs;
+        for (String key: source.keySet())
+        {
+            match = target.containsKey(key) && 
+                    target.get(key).equals(source.get(key));
+            
+            if (!match) 
+            { 
+                StringBuffer buf = new StringBuffer();
+                buf.append("Property given in address did not match with the args sent by the broker.");
+                buf.append(" Expected { ").append(key).append(" : ").append(source.get(key)).append(" }, ");
+                buf.append(" Actual { ").append(key).append(" : ").append(target.get(key)).append(" }");
+                _logger.debug(buf.toString());
+                return match;
+            }
+        }
+        
+        return match;
+    }
+}
\ No newline at end of file
diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/QpidQueue_0_10.java b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/QpidQueue_0_10.java
new file mode 100644
index 0000000..4c2f94a
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/QpidQueue_0_10.java
@@ -0,0 +1,253 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.messaging.address.amqp_0_10;
+
+import java.util.Map;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.messaging.Address;
+import org.apache.qpid.messaging.QpidDestination.CheckMode;
+import org.apache.qpid.messaging.Session;
+import org.apache.qpid.messaging.SubscriptionSettings;
+import org.apache.qpid.messaging.address.AddressException;
+import org.apache.qpid.messaging.address.AddressHelper;
+import org.apache.qpid.messaging.address.Link;
+import org.apache.qpid.messaging.address.Link.Reliability;
+import org.apache.qpid.messaging.amqp_0_10.Session_0_10;
+import org.apache.qpid.messaging.QpidQueue;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.transport.ExecutionErrorCode;
+import org.apache.qpid.transport.MessageAcceptMode;
+import org.apache.qpid.transport.MessageAcquireMode;
+import org.apache.qpid.transport.Option;
+import org.apache.qpid.transport.QueueQueryResult;
+import org.apache.qpid.transport.SessionException;
+
+public class QpidQueue_0_10 extends QpidQueue 
+{
+    private Address address;
+    private QueueNode queue;
+    private Link_0_10 link;
+    
+    public QpidQueue_0_10(Address address,QueueNode queue,Link_0_10 link) throws Exception
+    {
+        this.address = address;
+        this.queue = queue;
+        this.link = link;
+        queueName = address.getName();
+    }
+    
+    @Override
+    public void checkCreate(Session session,CheckMode mode) throws Exception 
+    {
+        org.apache.qpid.transport.Session ssn = ((Session_0_10)session).getProtocolSession();
+        
+        if (AddressHelper.isAllowed(queue.getCreatePolicy(), mode))
+        {                
+            ssn.queueDeclare(queueName, 
+                             queue.getAltExchange(), 
+                             queue.getDeclareArgs(),
+                             queue.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
+                             queue.isDurable() ? Option.DURABLE : Option.NONE,
+                             queue.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
+            
+            if (queue.getBindings().size() > 0)
+            {
+                for (Binding binding: queue.getBindings())
+                {
+                    String queue = binding.getQueue() == null?
+                                   queueName: binding.getQueue();
+                            
+                    String exchange = binding.getExchange() == null ? 
+                                      "anq.topic" :
+                                      binding.getExchange();
+    
+                    ssn.exchangeBind(queue, 
+                                     exchange,
+                                     binding.getBindingKey(),
+                                     binding.getArgs()); 
+                }
+            }
+        }
+        else
+        {
+            try
+            {
+                ssn.queueDeclare(queueName, null, null,Option.PASSIVE);
+                ssn.sync();
+            }
+            catch(SessionException e)
+            {
+                if (e.getException() != null 
+                    && e.getException().getErrorCode() == ExecutionErrorCode.NOT_FOUND)
+                {
+                    throw new AddressException("The Queue '" + queueName +"' does not exist",e);
+                }
+                else
+                {
+                    throw e;
+                }
+            }    
+        }
+    }
+
+    @Override
+    public void checkAssert(Session session,CheckMode mode) throws Exception 
+    {
+        org.apache.qpid.transport.Session ssn = ((Session_0_10)session).getProtocolSession();
+        
+        if (AddressHelper.isAllowed(queue.getAssertPolicy(), mode))
+        {
+            boolean match = false;
+            try
+            {
+                QueueQueryResult result = ssn.queueQuery(queueName, Option.NONE).get();
+                
+                match = queueName.equals(result.getQueue()) &&
+                (result.getDurable() == queue.isDurable()) && 
+                (result.getAutoDelete() == queue.isAutoDelete()) &&
+                (result.getExclusive() == queue.isExclusive()) &&
+                (queue.matchProps(result.getArguments()));
+            }
+            catch(SessionException e)
+            {
+                if (e.getException().getErrorCode() == ExecutionErrorCode.RESOURCE_DELETED)
+                {
+                    match = false;
+                }
+                else
+                {
+                    throw new AMQException(AMQConstant.getConstant(e.getException().getErrorCode().getValue()),
+                            "Error querying queue",e);
+                }
+            }        
+            if (!match)
+            {
+                throw new AddressException("The queue described in the address does not exist on the broker");
+            }
+        }
+    }
+
+    @Override
+    public void checkDelete(Session session,CheckMode mode) throws Exception 
+    {
+        org.apache.qpid.transport.Session ssn = ((Session_0_10)session).getProtocolSession();
+        
+        if (AddressHelper.isAllowed(queue.getDeletePolicy(), mode))
+        {
+            ssn.queueDelete(queueName, Option.NONE);
+            ssn.sync();
+        }
+    }
+
+    @Override
+    public void createSubscription(Session session,SubscriptionSettings settings) throws Exception 
+    {
+        org.apache.qpid.transport.Session ssn = ((Session_0_10)session).getProtocolSession();
+        
+        if (link.getQueueBindings().size() > 0)
+        {
+            for (Binding binding: link.getQueueBindings())
+            {
+                String queue = binding.getQueue() == null?
+                               queueName: binding.getQueue();
+                        
+                String exchange = binding.getExchange() == null ? 
+                                  "amq.topic" :
+                                  binding.getExchange();
+
+                ssn.exchangeBind(queue, 
+                                 exchange,
+                                 binding.getBindingKey(),
+                                 binding.getArgs()); 
+            }
+        }
+        
+        SubscriptionSettings_0_10 settings_0_10 = (SubscriptionSettings_0_10)settings;
+        Map<String,Object> arguments = settings_0_10.getArgs();
+        arguments.putAll((Map<? extends String, ? extends Object>)link.getSubscription().getArgs()); 
+                
+        if (link.getReliability() == Reliability.UNRELIABLE || 
+            link.getReliability() == Reliability.AT_MOST_ONCE)
+        {
+            settings_0_10.setAcceptMode(MessageAcceptMode.NONE);
+        }
+        
+        // for queues subscriptions are non exclusive by default.
+        boolean exclusive = 
+            (link.getSubscription().isExclusive() == null) ? false : link.getSubscription().isExclusive(); 
+        
+        ssn.messageSubscribe(queueName, 
+                             settings_0_10.getSubscriptionTag(),
+                             settings_0_10.getAcceptMode(),
+                             settings_0_10.getAccquireMode(),
+                             null,  // resume id
+                             0, // resume ttl
+                             arguments,
+                             exclusive ? Option.EXCLUSIVE : Option.NONE);
+    }
+
+    @Override
+    public void deleteSubscription(Session session) throws Exception 
+    {
+        org.apache.qpid.transport.Session ssn = ((Session_0_10)session).getProtocolSession();
+        
+        if (link.getQueueBindings().size() > 0)
+        {
+            for (Binding binding: link.getQueueBindings())
+            {
+                String queue = binding.getQueue() == null?
+                               queueName: binding.getQueue();
+                        
+                String exchange = binding.getExchange() == null ? 
+                                  "amq.topic" :
+                                  binding.getExchange();
+
+                ssn.exchangeUnbind(queue, 
+                                   exchange,
+                                   binding.getBindingKey(),
+                                   Option.NONE); 
+            }
+        }        
+        
+        // ideally we should cancel the subscription here 
+    }
+    
+    public String getSubscriptionQueue()
+    {
+        return queueName;
+    }
+    
+    public long getConsumerCapacity()
+    {
+        return link.getConsumerCapacity();
+    }
+    
+    public long getProducerCapacity()
+    {
+        return link.getProducerCapacity();
+    }
+    
+    public String toString()
+    {
+        return address.toString();
+    }    
+}
diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/QpidTopic_0_10.java b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/QpidTopic_0_10.java
new file mode 100644
index 0000000..b919522
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/QpidTopic_0_10.java
@@ -0,0 +1,278 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.messaging.address.amqp_0_10;
+
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.qpid.messaging.Address;
+import org.apache.qpid.messaging.Session;
+import org.apache.qpid.messaging.SubscriptionSettings;
+import org.apache.qpid.messaging.Address.PolicyType;
+import org.apache.qpid.messaging.QpidDestination.CheckMode;
+import org.apache.qpid.messaging.QpidTopic;
+import org.apache.qpid.messaging.address.AddressException;
+import org.apache.qpid.messaging.address.AddressHelper;
+import org.apache.qpid.messaging.address.Link;
+import org.apache.qpid.messaging.address.Link.Reliability;
+import org.apache.qpid.messaging.amqp_0_10.Session_0_10;
+import org.apache.qpid.transport.ExchangeQueryResult;
+import org.apache.qpid.transport.ExecutionErrorCode;
+import org.apache.qpid.transport.MessageAcceptMode;
+import org.apache.qpid.transport.Option;
+import org.apache.qpid.transport.SessionException;
+
+public class QpidTopic_0_10 extends QpidTopic 
+{
+    private String exchangeName;
+    private Session ssn;
+    private Address address;
+    private ExchangeNode exchange;
+    private Link_0_10 link;
+    private String subscriptionQueue;
+    
+    public QpidTopic_0_10(Address address,ExchangeNode exchange,Link_0_10 link) throws Exception
+    {
+        if (Reliability.AT_LEAST_ONCE == link.getReliability())
+        {
+            throw new Exception("AT-LEAST-ONCE is not yet supported for Topics");
+        }
+        
+        this.address = address;
+        this.exchange = exchange;
+        this.link = link;
+        this.exchangeName = address.getName();
+        topicName = retrieveTopicName();        
+    }
+
+    @Override
+    public void checkCreate(Session session,CheckMode mode) throws Exception 
+    {
+        org.apache.qpid.transport.Session ssn = ((Session_0_10)session).getProtocolSession();
+        
+        if (AddressHelper.isAllowed(exchange.getCreatePolicy(), mode))
+        {
+            ssn.exchangeDeclare(exchangeName, 
+                                exchange.getExchangeType(),
+                                exchange.getAltExchange(), 
+                                exchange.getDeclareArgs(),
+                                exchangeName.toString().startsWith("amq.") ? 
+                                Option.PASSIVE : Option.NONE);
+        }
+        else
+        {
+            try
+            {
+                ssn.exchangeDeclare(exchangeName, null, null, null, Option.PASSIVE);
+                ssn.sync();
+            }
+            catch(SessionException e)
+            {
+                if (e.getException() != null 
+                    && e.getException().getErrorCode() == ExecutionErrorCode.NOT_FOUND)
+                {
+                    throw new AddressException("The exchange '" + exchangeName +"' does not exist",e);
+                }
+                else
+                {
+                    throw e;
+                }
+            }
+        }
+    }
+
+    @Override
+    public void checkAssert(Session session,CheckMode mode) throws Exception 
+    {
+        org.apache.qpid.transport.Session ssn = ((Session_0_10)session).getProtocolSession();
+        
+        if (AddressHelper.isAllowed(exchange.getAssertPolicy(), mode))
+        {
+            ExchangeQueryResult result = ssn.exchangeQuery(exchangeName, Option.NONE).get();
+            boolean match = !result.getNotFound() &&        
+            (result.getDurable() == exchange.isDurable()) && 
+            (exchange.getExchangeType() != null && exchange.getExchangeType().equals(result.getType())) &&
+            (exchange.matchProps(result.getArguments()));
+            
+            if (!match)
+            {
+                throw new AddressException("The exchange described by the address does not exist on the broker");
+            }
+        }
+    }
+
+    @Override
+    public void checkDelete(Session session,CheckMode mode) throws Exception 
+    {
+        org.apache.qpid.transport.Session ssn = ((Session_0_10)session).getProtocolSession();
+        
+        if (AddressHelper.isAllowed(exchange.getDeletePolicy(), mode) &&
+           !exchangeName.toString().startsWith("amq."))
+        {
+            ssn.exchangeDelete(exchangeName, Option.NONE);
+            ssn.sync();
+        }
+    }
+
+    @Override
+    public void createSubscription(Session session,SubscriptionSettings settings) throws Exception 
+    {
+        org.apache.qpid.transport.Session ssn = ((Session_0_10)session).getProtocolSession();
+        subscriptionQueue = getSubscriptionQueueName();
+        
+        // for topics subscriptions queues are exclusive by default.
+        boolean exclusive = 
+            (link.isQueueExclusive() == null) ? true : link.getSubscription().isExclusive();
+        
+        // for topics subscriptions are autoDelete by default.
+        boolean autoDelete = 
+            (link.getSubscription().isExclusive() == null) ? true : link.getSubscription().isExclusive();
+        
+        ssn.queueDeclare(subscriptionQueue,
+                        link.getQueueAltExchange(),
+                        link.getQueueDeclareArgs(),
+                        autoDelete ? Option.AUTO_DELETE : Option.NONE,
+                        link.isDurable() ? Option.DURABLE : Option.NONE,
+                        exclusive ? Option.EXCLUSIVE : Option.NONE);
+        
+        if (link.getQueueBindings().size() > 0)
+        {
+            for (Binding binding: link.getQueueBindings())
+            {
+                String queue = binding.getQueue() == null?
+                               subscriptionQueue: binding.getQueue();
+                        
+                String exchange = binding.getExchange() == null ? 
+                                  address.getName() :
+                                  binding.getExchange();
+
+                ssn.exchangeBind(queue, 
+                                 exchange,
+                                 binding.getBindingKey(),
+                                 binding.getArgs()); 
+            }
+        }
+        else
+        {
+            String subject = address.getSubject();
+            ssn.exchangeBind(subscriptionQueue, 
+                             address.getName(),
+                             subject == null || subject.trim().equals("") ? "#" : subject,
+                             null); 
+        }
+        
+        SubscriptionSettings_0_10 settings_0_10 = (SubscriptionSettings_0_10)settings;
+        Map<String,Object> arguments = settings_0_10.getArgs();
+        arguments.putAll((Map<? extends String, ? extends Object>)link.getSubscription().getArgs()); 
+        
+        if (link.getReliability() == Reliability.UNRELIABLE || 
+            link.getReliability() == Reliability.AT_MOST_ONCE)
+        {
+            settings_0_10.setAcceptMode(MessageAcceptMode.NONE);
+        }
+        
+        // for topics subscriptions are exclusive by default.
+        boolean exclusiveConsume = 
+            (link.getSubscription().isExclusive() == null) ? true : link.getSubscription().isExclusive();
+        
+        ssn.messageSubscribe(subscriptionQueue, 
+                             settings_0_10.getSubscriptionTag(),
+                             settings_0_10.getAcceptMode(),
+                             settings_0_10.getAccquireMode(),
+                             null,  // resume id
+                             0, // resume ttl
+                             arguments,
+                             exclusiveConsume ? Option.EXCLUSIVE : Option.NONE);
+    }
+
+    @Override
+    public void deleteSubscription(Session session) throws Exception 
+    {
+        org.apache.qpid.transport.Session ssn = ((Session_0_10)session).getProtocolSession();
+        
+        if (link.getQueueBindings().size() > 0)
+        {
+            for (Binding binding: link.getQueueBindings())
+            {
+                String queue = binding.getQueue() == null?
+                        subscriptionQueue: binding.getQueue();
+                        
+                String exchange = binding.getExchange() == null ? 
+                                  "anq.topic" :
+                                  binding.getExchange();
+
+                ssn.exchangeUnbind(queue, 
+                                   exchange,
+                                   binding.getBindingKey(),
+                                   Option.NONE); 
+            }
+        }
+        ssn.queueDelete(subscriptionQueue, Option.NONE);
+    }
+    
+    private String getSubscriptionQueueName() 
+    {
+        if (link.getName() == null)
+        {
+            return "TempQueue" + UUID.randomUUID();
+        }
+        else
+        {
+            return link.getName();
+        }
+    }
+
+    private String retrieveTopicName() 
+    {
+        if (address.getSubject() != null && !address.getSubject().trim().equals(""))
+        {
+            return address.getSubject();
+        }
+        else if ("topic".equals(exchange.getExchangeType()))
+        {
+            return "#";
+        }
+        else
+        {
+            return "";
+        }
+    }
+    
+    public String getSubscriptionQueue()
+    {
+        return subscriptionQueue;
+    }
+        
+    public long getConsumerCapacity()
+    {
+        return link.getConsumerCapacity();
+    }
+    
+    public long getProducerCapacity()
+    {
+        return link.getProducerCapacity();
+    }
+    
+    public String toString()
+    {
+        return address.toString();
+    }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/QueueNode.java b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/QueueNode.java
new file mode 100644
index 0000000..04f0911
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/QueueNode.java
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.messaging.address.amqp_0_10;
+
+import java.util.List;
+
+import org.apache.qpid.messaging.Address;
+
+public class QueueNode extends Node_0_10 
+{
+    private boolean exclusive = false;
+    private List<Binding> bindings;
+    
+    public QueueNode(AddressHelper_0_10 helper) 
+    {
+        super(helper);
+        exclusive = helper.isNodeExclusive();       
+        bindings = helper.getNodeBindings();
+    }
+
+    public boolean isExclusive()
+    {
+        return exclusive;
+    }
+    
+    public List<Binding> getBindings()
+    {
+        return bindings;
+    }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/SubscriptionSettings_0_10.java b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/SubscriptionSettings_0_10.java
new file mode 100644
index 0000000..cbb51c6
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/messaging/address/amqp_0_10/SubscriptionSettings_0_10.java
@@ -0,0 +1,86 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.messaging.address.amqp_0_10;
+
+import java.util.Map;
+
+import org.apache.qpid.messaging.SubscriptionSettings;
+import org.apache.qpid.transport.MessageAcceptMode;
+import org.apache.qpid.transport.MessageAcquireMode;
+
+public class SubscriptionSettings_0_10 implements SubscriptionSettings 
+{
+    String messageSelector;
+    String subscriptionTag;
+    MessageAcceptMode acceptMode;
+    MessageAcquireMode accquireMode;
+    Map<String,Object> args;
+    
+    public String getMessageSelector() 
+    {
+        return messageSelector;
+    }
+    
+    public void setMessageSelector(String messageSelector) 
+    {
+        this.messageSelector = messageSelector;
+    }
+    
+    public String getSubscriptionTag() 
+    {
+        return subscriptionTag;
+    }
+    
+    public void setSubscriptionTag(String subscriptionTag) 
+    {
+        this.subscriptionTag = subscriptionTag;
+    }
+    
+    public MessageAcceptMode getAcceptMode() 
+    {
+        return acceptMode;
+    }
+    
+    public void setAcceptMode(MessageAcceptMode acceptMode) 
+    {
+        this.acceptMode = acceptMode;
+    }
+    
+    public MessageAcquireMode getAccquireMode() 
+    {
+        return accquireMode;
+    }
+    
+    public void setAccquireMode(MessageAcquireMode accquireMode) 
+    {
+        this.accquireMode = accquireMode;
+    }
+    
+    public Map<String, Object> getArgs() 
+    {
+        return args;
+    }
+    
+    public void setArgs(Map<String, Object> args) 
+    {
+        this.args = args;
+    }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/messaging/amqp_0_10/Session_0_10.java b/java/client/src/main/java/org/apache/qpid/messaging/amqp_0_10/Session_0_10.java
new file mode 100644
index 0000000..ad611ac
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/messaging/amqp_0_10/Session_0_10.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.messaging.amqp_0_10;
+
+import org.apache.qpid.messaging.Session;
+
+/**
+ * Currently only a marker interface used primarily 
+ * for address destination refactoring QPID-3401
+ *
+ */
+public class Session_0_10 implements Session 
+{
+    private org.apache.qpid.transport.Session protocolSession;
+
+    public Session_0_10(org.apache.qpid.transport.Session ssn)
+    {
+        protocolSession = ssn;
+    }
+    
+    public org.apache.qpid.transport.Session getProtocolSession()
+    {
+        return protocolSession;
+    }
+}
diff --git a/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java b/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
index 8498272..674f16f 100644
--- a/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
+++ b/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
@@ -154,7 +154,7 @@
     public void testExceptionOnCreateConsumer()
     {
         AMQSession_0_10 session = createThrowingExceptionAMQSession_0_10();
-        AMQAnyDestination destination = createDestination();
+        AMQDestination destination = createDestination();
         try
         {
             session.createConsumer(destination);
@@ -170,10 +170,9 @@
     public void testExceptionOnCreateSubscriber()
     {
         AMQSession_0_10 session = createThrowingExceptionAMQSession_0_10();
-        AMQAnyDestination destination = createDestination();
         try
         {
-            session.createSubscriber(destination);
+            session.createSubscriber(new AMQTopic(new AMQShortString("amq.topic"),new AMQShortString("test")));
             fail("JMSException should be thrown");
         }
         catch (Exception e)
@@ -518,13 +517,13 @@
         assertNotNull("ExchangeDeclare event was not sent", event);
     }
 
-    private AMQAnyDestination createDestination()
+    private AMQDestination createDestination()
     {
-        AMQAnyDestination destination = null;
+        AMQDestination destination = null;
         try
         {
-            destination = new AMQAnyDestination(new AMQShortString("amq.direct"), new AMQShortString("direct"),
-                    new AMQShortString("test"), false, true, new AMQShortString("test"), true, null);
+            destination = new AMQQueue(new AMQShortString("amq.direct"), new AMQShortString("direct"),
+                    new AMQShortString("test"),new AMQShortString("test"), false, true, true, null);
         }
         catch (Exception e)
         {
diff --git a/java/common/src/main/java/org/apache/qpid/configuration/Accessor.java b/java/common/src/main/java/org/apache/qpid/configuration/Accessor.java
index dc5b69d..d154f70 100644
--- a/java/common/src/main/java/org/apache/qpid/configuration/Accessor.java
+++ b/java/common/src/main/java/org/apache/qpid/configuration/Accessor.java
@@ -26,6 +26,7 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -36,6 +37,8 @@
     public Integer getInt(String name);
     public Long getLong(String name);
     public String getString(String name);
+    public Map getMap(String name);
+    public List getList(String name);
     
     static class SystemPropertyAccessor implements Accessor
     {
@@ -58,6 +61,10 @@
         {
             return System.getProperty(name);
         }
+        
+        public Map getMap(String name){ throw new UnsupportedOperationException("Not supported by system properties"); }
+        
+        public List getList(String name){ throw new UnsupportedOperationException("Not supported by system properties"); }
     }
     
     static class MapAccessor implements Accessor
@@ -144,6 +151,31 @@
                 return null;
             }
         }
+                             
+         public Map getMap(String name)
+         { 
+             if (source != null && source.containsKey(name) && source.get(name) instanceof Map)
+             {
+                 return (Map)source.get(name);
+             }
+             else
+             {
+                 return null;
+             } 
+         }
+         
+         public List getList(String name)
+         { 
+             if (source != null && source.containsKey(name) && source.get(name) instanceof List)
+             {
+                 return (List)source.get(name);
+             }
+             else
+             {
+                 return null;
+             } 
+         }
+
     }  
     
     static class PropertyFileAccessor extends MapAccessor
@@ -163,6 +195,12 @@
             }
             source = props;
         }
+        
+        @Override
+        public Map getMap(String name){ throw new UnsupportedOperationException("Not supported by property file"); }
+        
+        @Override
+        public List getList(String name){ throw new UnsupportedOperationException("Not supported by property file"); }
     }
     
     static class CombinedAccessor implements Accessor
@@ -190,7 +228,7 @@
         {
             for (Accessor accessor: accessors)
             {
-                if (accessor.getBoolean(name) != null)
+                if (accessor.getInt(name) != null)
                 {
                     return accessor.getInt(name);
                 }
@@ -202,7 +240,7 @@
         {
             for (Accessor accessor: accessors)
             {
-                if (accessor.getBoolean(name) != null)
+                if (accessor.getLong(name) != null)
                 {
                     return accessor.getLong(name);
                 }
@@ -214,13 +252,37 @@
         {
             for (Accessor accessor: accessors)
             {
-                if (accessor.getBoolean(name) != null)
+                if (accessor.getString(name) != null)
                 {
                     return accessor.getString(name);
                 }
             }
             return null;
         }
+        
+         public Map getMap(String name)
+         {
+             for (Accessor accessor: accessors)
+             {
+                 if (accessor.getMap(name) != null && accessor.getMap(name) instanceof Map)
+                 {
+                     return accessor.getMap(name);
+                 }
+             }
+             return null;
+         }
+         
+         public List getList(String name)
+         {
+             for (Accessor accessor: accessors)
+             {
+                 if (accessor.getMap(name) != null && accessor.getList(name) instanceof List)
+                 {
+                     return accessor.getList(name);
+                 }
+             }
+             return null;
+         }
     }
     
     static class ValidationAccessor implements Accessor
@@ -269,5 +331,97 @@
             }
             return v;
         }
+        
+        public Map getMap(String name){ throw new UnsupportedOperationException("Validator interface does not support maps"); }
+        
+        public List getList(String name){ throw new UnsupportedOperationException("Validator interface does not support maps"); }
     }
+    
+      /**
+       * Property names as passed in the form
+       * level_1_prop/level_2_prop/.../level_n_prop
+       * All property name upto level_n-1_prop should return
+       * a map or null
+       */
+      static class NestedMapAccessor implements Accessor
+      {   
+          protected Map<Object,Object> baseMap;
+          
+          public NestedMapAccessor(Map<Object,Object> map)
+          {
+              baseMap = map;
+          }
+          
+          private String getKey(String name)
+          {
+              if (name.lastIndexOf("/") > -1)
+              {
+                  return name.substring(name.lastIndexOf("/")+1);
+              }
+              else
+              {
+                  return name;
+              }
+          }
+      
+          private MapAccessor mapIterator(String name)
+          {
+              if (name.lastIndexOf("/") == -1)
+              {
+                  return new MapAccessor(baseMap);
+              }
+             
+              String[] paths = name.substring(0,name.lastIndexOf("/")).split("/");            
+              Map map = baseMap == null ? Collections.EMPTY_MAP : baseMap;
+              
+              for (String path:paths)
+              {
+                  
+                  Object obj = map.get(path);
+                  if (obj == null)
+                  {
+                      return new MapAccessor(null);
+                  }
+                  else if (obj instanceof Map)
+                  {
+                      map = (Map)obj;
+                  }
+                  else  
+                  {
+                      throw new IllegalArgumentException(path + " doesn't retrieve another map");
+                  }
+              }            
+              return new MapAccessor(map);
+          }
+          
+          public Boolean getBoolean(String name)
+          {
+              return mapIterator(name).getBoolean(getKey(name));
+          }
+          
+          public Integer getInt(String name)
+          {
+              return mapIterator(name).getInt(getKey(name));
+          }
+          
+          public Long getLong(String name)
+          {
+              return mapIterator(name).getLong(getKey(name));
+          }
+          
+          public String getString(String name)
+          {
+              return mapIterator(name).getString(getKey(name));
+          }
+          
+          public Map getMap(String name)
+          { 
+              return mapIterator(name).getMap(getKey(name)); 
+          }
+          
+          public List getList(String name)
+          { 
+              return mapIterator(name).getList(getKey(name)); 
+          }
+      }
 }
diff --git a/java/common/src/main/java/org/apache/qpid/messaging/Address.java b/java/common/src/main/java/org/apache/qpid/messaging/Address.java
index 2c7fe7b..ce8734f 100644
--- a/java/common/src/main/java/org/apache/qpid/messaging/Address.java
+++ b/java/common/src/main/java/org/apache/qpid/messaging/Address.java
@@ -34,6 +34,21 @@
 
 public class Address
 {
+  public enum AddressType {QUEUE_ADDRESS, TOPIC_ADDRESS, UNSPECIFIED };
+  
+  public enum PolicyType 
+  {
+      ALWAYS, NEVER, SENDER, RECEIVER;
+      public static PolicyType getPolicyType(String str)
+      {
+          if ( str == null || str.equals("") || "never".equals(str)) return PolicyType.NEVER;
+          if ("always".equals(str)) return PolicyType.ALWAYS;
+          else if ("sender".equals(str)) return PolicyType.SENDER;
+          else if ("receiver".equals(str)) return PolicyType.RECEIVER;
+          else throw new IllegalArgumentException(str + " is not an allowed value");
+      }
+  }
+
 
     public static Address parse(String address)
     {
@@ -43,6 +58,8 @@
     private String name;
     private String subject;
     private Map options;
+    private AddressType type = AddressType.QUEUE_ADDRESS;
+    private boolean resolved = false;
 
     public Address(String name, String subject, Map options)
     {
@@ -50,7 +67,27 @@
         this.subject = subject;
         this.options = options;
     }
-
+    
+    public AddressType getAddressType()
+    {
+        return type;
+    }
+  
+    public void setAddressType(AddressType type)
+    {
+        this.type = type;
+    }
+  
+    public boolean isResolved()
+    {
+       return resolved;
+    }
+  
+    public void setResolved(boolean b)
+    {
+       this.resolved = b;
+    }
+  
     public String getName()
     {
         return name;
diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
index 8c3c247..c07178d 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
@@ -29,6 +29,7 @@
 
 import javax.jms.Connection;
 import javax.jms.Destination;
+import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
@@ -44,14 +45,14 @@
 import javax.naming.Context;
 import javax.naming.InitialContext;
 
-import org.apache.qpid.client.AMQAnyDestination;
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.client.AMQSession_0_10;
+import org.apache.qpid.client.AddressBasedDestination;
+import org.apache.qpid.client.AddressBasedQueue;
+import org.apache.qpid.client.AddressBasedTopic;
 import org.apache.qpid.client.message.QpidMessageProperties;
-import org.apache.qpid.client.messaging.address.Node.ExchangeNode;
-import org.apache.qpid.client.messaging.address.Node.QueueNode;
 import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
 import org.apache.qpid.messaging.Address;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
@@ -69,6 +70,16 @@
     {
         super.setUp();
         _connection = getConnection() ;
+        _connection.setExceptionListener(new ExceptionListener()
+        {
+
+            @Override
+            public void onException(JMSException ex) 
+            {
+                // ignore                
+            }
+            
+        });
         _connection.start();
     }
     
@@ -79,6 +90,18 @@
         super.tearDown();
     }
     
+    // Currently if we get a session exception the connection is canned.
+    private void recreateConnection() throws Exception
+    {
+        _connection = getConnection() ;
+        _connection.start();
+    }
+    
+    private AddressBasedDestination getDestination(String addr) throws Exception
+    {
+        return (AddressBasedDestination)AMQDestination.createDestination(addr);
+    }
+    
     public void testCreateOptions() throws Exception
     {
         Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
@@ -88,118 +111,140 @@
         // default (create never, assert never) -------------------
         // create never --------------------------------------------
         String addr1 = "ADDR:testQueue1";
-        AMQDestination  dest = new AMQAnyDestination(addr1);
+        AddressBasedDestination  dest = getDestination(addr1);
         try
         {
             cons = jmsSession.createConsumer(dest);
+            fail("Exception should have been thrown as the queue does not exist");
         }
         catch(JMSException e)
         {
-            assertTrue(e.getMessage().contains("The name 'testQueue1' supplied in the address " +
-                    "doesn't resolve to an exchange or a queue"));
-        }
+            assertTrue(e.getCause().getMessage().contains("The Queue 'testQueue1' does not exist"));
+            recreateConnection();
+            jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);            
+        }       
         
         try
         {
             prod = jmsSession.createProducer(dest);
+            fail("Exception should have been thrown as the queue does not exist");
         }
         catch(JMSException e)
         {
-            assertTrue(e.getCause().getCause().getMessage().contains("The name 'testQueue1' supplied in the address " +
-                    "doesn't resolve to an exchange or a queue"));
+            e.printStackTrace();
+            assertTrue(e.getCause().getCause().getCause().getMessage().contains("The Queue 'testQueue1' does not exist"));
+            recreateConnection();
+            jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);            
         }
             
         assertFalse("Queue should not be created",(
-                (AMQSession_0_10)jmsSession).isQueueExist(dest, (QueueNode)dest.getSourceNode() ,true));
+                (AMQSession_0_10)jmsSession).isQueueExist(dest.getAddress().getName()));
         
         
         // create always -------------------------------------------
         addr1 = "ADDR:testQueue1; { create: always }";
-        dest = new AMQAnyDestination(addr1);
+        dest = getDestination(addr1);
         cons = jmsSession.createConsumer(dest); 
         
         assertTrue("Queue not created as expected",(
-                (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));              
+                (AMQSession_0_10)jmsSession).isQueueExist(dest.getAddress().getName()));              
         assertTrue("Queue not bound as expected",(
                 (AMQSession_0_10)jmsSession).isQueueBound("", 
-                    dest.getAddressName(),dest.getAddressName(), dest.getSourceNode().getDeclareArgs()));
+                    dest.getAddress().getName(),dest.getAddress().getName(),null));
         
         // create receiver -----------------------------------------
         addr1 = "ADDR:testQueue2; { create: receiver }";
-        dest = new AMQAnyDestination(addr1);
+        dest = getDestination(addr1);
         try
         {
             prod = jmsSession.createProducer(dest);
+            fail("Exception should have been thrown as the queue does not exist");
         }
         catch(JMSException e)
         {
-            assertTrue(e.getCause().getCause().getMessage().contains("The name 'testQueue2' supplied in the address " +
-                    "doesn't resolve to an exchange or a queue"));
+            assertTrue(e.getCause().getCause().getCause().getMessage().contains("The Queue 'testQueue2' does not exist"));
+            jmsSession.close();
+            recreateConnection();
+            jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
         }
-            
-        assertFalse("Queue should not be created",(
-                (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
         
+        System.out.println("===========================================");
+        System.out.println("jmsSession current exception " + ((AMQSession_0_10)jmsSession).getCurrentException());    
+        System.out.println("===========================================");
+        
+        assertFalse("Queue should not be created",(
+                (AMQSession_0_10)jmsSession).isQueueExist(dest.getAddress().getName()));
+        
+        
+        System.out.println("===========================================");
+        System.out.println("jmsSession current exception " + ((AMQSession_0_10)jmsSession).getCurrentException());    
+        System.out.println("===========================================");
         
         cons = jmsSession.createConsumer(dest); 
         
         assertTrue("Queue not created as expected",(
-                (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));              
+                (AMQSession_0_10)jmsSession).isQueueExist(dest.getAddress().getName()));              
         assertTrue("Queue not bound as expected",(
                 (AMQSession_0_10)jmsSession).isQueueBound("", 
-                    dest.getAddressName(),dest.getAddressName(), dest.getSourceNode().getDeclareArgs()));
+                    dest.getAddress().getName(),dest.getAddress().getName(), null));
         
         // create never --------------------------------------------
         addr1 = "ADDR:testQueue3; { create: never }";
-        dest = new AMQAnyDestination(addr1);
+        dest = getDestination(addr1);
         try
         {
-            cons = jmsSession.createConsumer(dest); 
+            cons = jmsSession.createConsumer(dest);
+            fail("Exception should have been thrown as the queue does not exist");
         }
         catch(JMSException e)
         {
-            assertTrue(e.getMessage().contains("The name 'testQueue3' supplied in the address " +
-                    "doesn't resolve to an exchange or a queue"));
+            assertTrue(e.getCause().getMessage().contains("The Queue 'testQueue3' does not exist"));
+            recreateConnection();
+            jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
         }
         
         try
         {
             prod = jmsSession.createProducer(dest);
+            fail("Exception should have been thrown as the queue does not exist");
         }
         catch(JMSException e)
         {
-            assertTrue(e.getCause().getCause().getMessage().contains("The name 'testQueue3' supplied in the address " +
-                    "doesn't resolve to an exchange or a queue"));
+            assertTrue(e.getCause().getCause().getCause().getMessage().contains("The Queue 'testQueue3' does not exist"));
+            recreateConnection();
+            jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
         }
             
         assertFalse("Queue should not be created",(
-                (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+                (AMQSession_0_10)jmsSession).isQueueExist(dest.getAddress().getName()));
         
         // create sender ------------------------------------------
         addr1 = "ADDR:testQueue3; { create: sender }";
-        dest = new AMQAnyDestination(addr1);
+        dest = getDestination(addr1);
                 
         try
         {
             cons = jmsSession.createConsumer(dest); 
+            fail("Exception should have been thrown as the queue does not exist");
         }
         catch(JMSException e)
         {
-            assertTrue(e.getMessage().contains("The name 'testQueue3' supplied in the address " +
-                    "doesn't resolve to an exchange or a queue"));
+            assertTrue(e.getCause().getMessage().contains("The Queue 'testQueue3' does not exist"));
+            recreateConnection();
+            jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
         }
         assertFalse("Queue should not be created",(
-                (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+                (AMQSession_0_10)jmsSession).isQueueExist(dest.getAddress().getName()));
         
         prod = jmsSession.createProducer(dest);
         assertTrue("Queue not created as expected",(
-                (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));              
+                (AMQSession_0_10)jmsSession).isQueueExist(dest.getAddress().getName()));              
         assertTrue("Queue not bound as expected",(
                 (AMQSession_0_10)jmsSession).isQueueBound("", 
-                    dest.getAddressName(),dest.getAddressName(), dest.getSourceNode().getDeclareArgs()));
+                    dest.getAddress().getName(),dest.getAddress().getName(), null));
         
     }
- 
+    
     public void testCreateQueue() throws Exception
     {
         Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
@@ -226,30 +271,30 @@
                                      
                             "}" +
                       "}";
-        AMQDestination dest = new AMQAnyDestination(addr);
+        AddressBasedDestination dest = getDestination(addr);
         MessageConsumer cons = jmsSession.createConsumer(dest); 
         cons.close();
         
         // Even if the consumer is closed the queue and the bindings should be intact.
         
         assertTrue("Queue not created as expected",(
-                (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));              
+                (AMQSession_0_10)jmsSession).isQueueExist(dest.getAddress().getName()));              
         
         assertTrue("Queue not bound as expected",(
                 (AMQSession_0_10)jmsSession).isQueueBound("", 
-                    dest.getAddressName(),dest.getAddressName(), null));
+                        dest.getAddress().getName(),dest.getAddress().getName(), null));
         
         assertTrue("Queue not bound as expected",(
                 (AMQSession_0_10)jmsSession).isQueueBound("amq.direct", 
-                    dest.getAddressName(),"test", null));
+                        dest.getAddress().getName(),"test", null));
         
         assertTrue("Queue not bound as expected",(
                 (AMQSession_0_10)jmsSession).isQueueBound("amq.fanout", 
-                    dest.getAddressName(),null, null));
+                        dest.getAddress().getName(),null, null));
         
         assertTrue("Queue not bound as expected",(
                 (AMQSession_0_10)jmsSession).isQueueBound("amq.topic", 
-                    dest.getAddressName(),"a.#", null));   
+                        dest.getAddress().getName(),"a.#", null));   
         
         Map<String,Object> args = new HashMap<String,Object>();
         args.put("x-match","any");
@@ -257,7 +302,7 @@
         args.put("loc","CA");
         assertTrue("Queue not bound as expected",(
                 (AMQSession_0_10)jmsSession).isQueueBound("amq.match", 
-                    dest.getAddressName(),null, args));
+                        dest.getAddress().getName(),null, args));
         
         MessageProducer prod = jmsSession.createProducer(dest);
         prod.send(jmsSession.createTextMessage("test"));
@@ -312,7 +357,7 @@
                         "}" +
                       "}";
         
-        AMQDestination dest = new AMQAnyDestination(addr);
+        AddressBasedDestination dest = getDestination(addr);
 
         MessageConsumer cons;
         try
@@ -338,7 +383,7 @@
         }
         
         assertTrue("Exchange not created as expected",(
-                (AMQSession_0_10)jmsSession).isExchangeExist(dest, (ExchangeNode)dest.getTargetNode() , true));
+                (AMQSession_0_10)jmsSession).isExchangeExist(dest.getAddress().getName()));
        
         // The existence of the queue is implicitly tested here
         assertTrue("Queue not bound as expected",(
@@ -346,7 +391,7 @@
                     dest.getQueueName(),"hello", Collections.<String, Object>emptyMap()));
         
         // The client should be able to query and verify the existence of my-exchange (QPID-2774)
-        dest = new AMQAnyDestination("ADDR:my-exchange; {create: never}");
+        dest = getDestination("ADDR:my-exchange; {create: never}");
         cons = jmsSession.createConsumer(dest); 
     }
     
@@ -376,27 +421,27 @@
         return argsString;
     }
 
-    public void checkQueueForBindings(Session jmsSession, AMQDestination dest,String headersBinding) throws Exception
+    public void checkQueueForBindings(Session jmsSession, AddressBasedDestination dest,String headersBinding) throws Exception
     {
-    	assertTrue("Queue not created as expected",(
-                (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));              
+        assertTrue("Queue not created as expected",(
+                (AMQSession_0_10)jmsSession).isQueueExist(dest.getAddress().getName()));              
         
         assertTrue("Queue not bound as expected",(
                 (AMQSession_0_10)jmsSession).isQueueBound("", 
-                    dest.getAddressName(),dest.getAddressName(), null));
+                        dest.getAddress().getName(),dest.getAddress().getName(), null));
         
         assertTrue("Queue not bound as expected",(
                 (AMQSession_0_10)jmsSession).isQueueBound("amq.direct", 
-                    dest.getAddressName(),"test", null));  
+                        dest.getAddress().getName(),"test", null));  
       
         assertTrue("Queue not bound as expected",(
                 (AMQSession_0_10)jmsSession).isQueueBound("amq.topic", 
-                    dest.getAddressName(),"a.#", null));
+                        dest.getAddress().getName(),"a.#", null));
         
         Address a = Address.parse(headersBinding);
         assertTrue("Queue not bound as expected",(
                 (AMQSession_0_10)jmsSession).isQueueBound("amq.match", 
-                    dest.getAddressName(),null, a.getOptions()));
+                        dest.getAddress().getName(),null, a.getOptions()));
     }
     
     /**
@@ -406,7 +451,7 @@
     public void testBindQueueWithArgs() throws Exception
     {
         
-    	Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+        Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
         String headersBinding = "{exchange: 'amq.match', arguments: {x-match: any, dep: sales, loc: CA}}";
         
         String addr = "node: "  + 
@@ -425,11 +470,11 @@
                       "}";
 
         
-        AMQDestination dest1 = new AMQAnyDestination("ADDR:my-queue/hello; {create: receiver, " +addr);
+        AddressBasedDestination dest1 = getDestination("ADDR:my-queue/hello; {create: receiver, " +addr);
         MessageConsumer cons = jmsSession.createConsumer(dest1); 
         checkQueueForBindings(jmsSession,dest1,headersBinding);       
         
-        AMQDestination dest2 = new AMQAnyDestination("ADDR:my-queue2/hello; {create: sender, " +addr);
+        AddressBasedDestination dest2 = getDestination("ADDR:my-queue2/hello; {create: sender, " +addr);
         MessageProducer prod = jmsSession.createProducer(dest2); 
         checkQueueForBindings(jmsSession,dest2,headersBinding);     
     }
@@ -467,7 +512,7 @@
         
         Session jmsSession = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
         
-        AMQDestination dest = new AMQAnyDestination(address);
+        AddressBasedDestination dest = getDestination(address);
         MessageConsumer cons = jmsSession.createConsumer(dest); 
         MessageProducer prod = jmsSession.createProducer(dest);
         
@@ -508,8 +553,8 @@
         PropertiesFileInitialContextFactory props = new PropertiesFileInitialContextFactory();
         Context ctx = props.getInitialContext(map);
         
-        AMQDestination dest1 = (AMQDestination)ctx.lookup("myQueue1");      
-        AMQDestination dest2 = (AMQDestination)ctx.lookup("myQueue2");
+        AddressBasedDestination dest1 = (AddressBasedDestination)ctx.lookup("myQueue1");      
+        AddressBasedDestination dest2 = (AddressBasedDestination)ctx.lookup("myQueue2");
         AMQDestination dest3 = (AMQDestination)ctx.lookup("myQueue3");
         
         Session jmsSession = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
@@ -518,25 +563,25 @@
         MessageConsumer cons3 = jmsSession.createConsumer(dest3);
         
         assertTrue("Destination1 was not created as expected",(
-                (AMQSession_0_10)jmsSession).isQueueExist(dest1,(QueueNode)dest1.getSourceNode(), true));              
+                (AMQSession_0_10)jmsSession).isQueueExist(dest1.getQueueName()));              
         
         assertTrue("Destination1 was not bound as expected",(
                 (AMQSession_0_10)jmsSession).isQueueBound("", 
-                    dest1.getAddressName(),dest1.getAddressName(), null));
+                        dest1.getAddress().getName(),dest1.getAddress().getName(), null));
         
         assertTrue("Destination2 was not created as expected",(
-                (AMQSession_0_10)jmsSession).isQueueExist(dest2,(QueueNode)dest2.getSourceNode(), true));              
+                (AMQSession_0_10)jmsSession).isQueueExist(dest2.getQueueName()));              
         
         assertTrue("Destination2 was not bound as expected",(
                 (AMQSession_0_10)jmsSession).isQueueBound("", 
-                    dest2.getAddressName(),dest2.getAddressName(), null));
+                        dest2.getAddress().getName(),dest2.getAddress().getName(), null));
         
         MessageProducer producer = jmsSession.createProducer(dest3);
         producer.send(jmsSession.createTextMessage("Hello"));
         TextMessage msg = (TextMessage)cons3.receive(1000);
         assertEquals("Destination3 was not created as expected.",msg.getText(),"Hello");
     }
-
+    
     /**
      * Test goal: Verifies the subject can be overridden using "qpid.subject" message property.
      * Test strategy: Creates and address with a default subject "topic1"
@@ -547,7 +592,7 @@
     {
         Session jmsSession = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
         
-        AMQDestination topic1 = new AMQAnyDestination("ADDR:amq.topic/topic1; {link:{name: queue1}}");
+        AddressBasedDestination topic1 = getDestination("ADDR:amq.topic/topic1; {link:{name: queue1}}");
         
         MessageProducer prod = jmsSession.createProducer(topic1);
         
@@ -555,7 +600,7 @@
         m.setStringProperty("qpid.subject", "topic2");
         
         MessageConsumer consForTopic1 = jmsSession.createConsumer(topic1);
-        MessageConsumer consForTopic2 = jmsSession.createConsumer(new AMQAnyDestination("ADDR:amq.topic/topic2; {link:{name: queue2}}"));
+        MessageConsumer consForTopic2 = jmsSession.createConsumer(getDestination("ADDR:amq.topic/topic2; {link:{name: queue2}}"));
         
         prod.send(m);
         Message msg = consForTopic1.receive(1000);
@@ -591,14 +636,15 @@
         queue = ssn.createQueue("ADDR:my-queue2");
         try
         {
-        	prod = ssn.createProducer(queue);
-        	fail("The client should throw an exception, since there is no queue present in the broker");
+            prod = ssn.createProducer(queue);
+            fail("The client should throw an exception, since there is no queue present in the broker");
         }
         catch(Exception e)
         {
-        	String s = "The name 'my-queue2' supplied in the address " +
-        			"doesn't resolve to an exchange or a queue";
-        	assertEquals(s,e.getCause().getCause().getMessage());
+            String s = "The Queue 'my-queue2' does not exist";
+            assertEquals(s,e.getCause().getCause().getCause().getMessage());
+            recreateConnection();
+            ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
         }
         
         // explicit create case
@@ -614,10 +660,11 @@
         cons.close();
         
         // Using the ADDR method to create a more complicated queue
-        String addr = "ADDR:amq.direct/x512; {create: receiver, " +
-        "link : {name : 'MY.RESP.QUEUE', " + 
-        "x-declare : { auto-delete: true, exclusive: true, " +
-                     "arguments : {'qpid.max_size': 1000, 'qpid.policy_type': ring} } } }";
+        String addr = "ADDR:MY.RESP.QUEUE; {create: sender, " +
+                      "node : {x-declare : { auto-delete: true, exclusive: true, " +
+                      "arguments : {'qpid.max_size': 1000, 'qpid.policy_type': ring} } }," +
+                      "link : {x-bindings:[{exchange: 'amq.direct', key:x512}]}" +
+                      " }";
         queue = ssn.createQueue(addr);
         
         prod = ssn.createProducer(queue); 
@@ -692,9 +739,9 @@
         prod = ssn.createProducer(topic); 
         cons = ssn.createConsumer(topic);
         
-        assertTrue("The queue was not bound to vehicle exchange using bus as the binding key",(
+        /*assertTrue("The queue was not bound to vehicle exchange using bus as the binding key",(
                 (AMQSession_0_10)ssn).isQueueBound("vehicles", 
-                    "my-topic","bus", null));
+                    "my-topic","bus", null));*/
         
         assertTrue("The queue was not bound to vehicle exchange using car as the binding key",(
                 (AMQSession_0_10)ssn).isQueueBound("vehicles", 
@@ -710,7 +757,7 @@
         assertNotNull("consumer should receive a message",cons.receive(1000));
         cons.close();
     }
-    
+
     /**
      * Test Goal : Verify the default subjects used for each exchange type.
      * The default for amq.topic is "#" and for the rest it's ""
@@ -719,16 +766,10 @@
     {
         Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
         
-        MessageConsumer queueCons = ssn.createConsumer(new AMQAnyDestination("ADDR:amq.direct"));
-        MessageConsumer topicCons = ssn.createConsumer(new AMQAnyDestination("ADDR:amq.topic"));
+        MessageConsumer topicCons = ssn.createConsumer(getDestination("ADDR:amq.topic"));
         
-        MessageProducer queueProducer = ssn.createProducer(new AMQAnyDestination("ADDR:amq.direct"));
-        MessageProducer topicProducer1 = ssn.createProducer(new AMQAnyDestination("ADDR:amq.topic/usa.weather"));
-        MessageProducer topicProducer2 = ssn.createProducer(new AMQAnyDestination("ADDR:amq.topic/sales"));
-        
-        queueProducer.send(ssn.createBytesMessage());
-        assertNotNull("The consumer subscribed to amq.direct " +
-        		"with empty binding key should have received the message ",queueCons.receive(1000));
+        MessageProducer topicProducer1 = ssn.createProducer(getDestination("ADDR:amq.topic/usa.weather"));
+        MessageProducer topicProducer2 = ssn.createProducer(getDestination("ADDR:amq.topic/sales"));
         
         topicProducer1.send(ssn.createTextMessage("25c"));
         assertEquals("The consumer subscribed to amq.topic " +
@@ -756,7 +797,7 @@
         
         Destination dest = ssn.createQueue(addr);
         MessageConsumer browseCons = ssn.createConsumer(dest);
-        MessageProducer prod = ssn.createProducer(ssn.createQueue("ADDR:amq.direct/test"));
+        MessageProducer prod = ssn.createProducer(ssn.createTopic("ADDR:amq.direct/test"));
         
         prod.send(ssn.createTextMessage("Test1"));
         prod.send(ssn.createTextMessage("Test2"));
@@ -791,8 +832,17 @@
     {
         Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);        
         Destination dest = ssn.createTopic("ADDR:amq.topic/foo; {link:{durable:true}}");
+        
+        System.out.println("------------ Creating consumer 1-----------------------");
+        
         MessageConsumer consumer1 = ssn.createConsumer(dest);
+        
+        System.out.println("------------ / Creating consumer 1-----------------------");
+        
+        System.out.println("------------ Creating consumer 2-----------------------");
         MessageConsumer consumer2 = ssn.createConsumer(dest);
+        System.out.println("------------/ Creating consumer 2-----------------------");
+        
         MessageProducer prod = ssn.createProducer(dest);
         
         prod.send(ssn.createTextMessage("A"));
@@ -819,7 +869,7 @@
         _connection = getConnection() ;
         _connection.start();
         ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);        
-        dest = ssn.createTopic("ADDR:my_queue; {create: always}");
+        dest = ssn.createQueue("ADDR:my_queue; {create: always}");
         consumer1 = ssn.createConsumer(dest);
         consumer2 = ssn.createConsumer(dest);
         prod = ssn.createProducer(dest);
@@ -842,17 +892,17 @@
     {
         Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
         String addr = "ADDR:MRKT; " +
-        		"{" +
-        		    "create: receiver," + 
-        		    "node : {type: topic, x-declare: {type: topic} },"  +
-        		    "link:{" +
-        		         "name: my-topic," +
-        		         "x-bindings:[{key:'NYSE.#'},{key:'NASDAQ.#'},{key:'CNTL.#'}]" +
-        		         "}" +
-        		"}";
+                "{" +
+                    "create: receiver," + 
+                    "node : {type: topic, x-declare: {type: topic} },"  +
+                    "link:{" +
+                         "name: my-topic," +
+                         "x-bindings:[{key:'NYSE.#'},{key:'NASDAQ.#'},{key:'CNTL.#'}]" +
+                         "}" +
+                "}";
         
         // Using the ADDR method to create a more complicated topic
-        MessageConsumer  cons = ssn.createConsumer(new AMQAnyDestination(addr));
+        MessageConsumer  cons = ssn.createConsumer(getDestination(addr));
         
         assertTrue("The queue was not bound to MRKT exchange using NYSE.# as the binding key",(
                 (AMQSession_0_10)ssn).isQueueBound("MRKT", 
@@ -878,7 +928,7 @@
     {
         Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
         String str = "ADDR:my_queue; {create:always,link: {x-subscribes:{exclusive: true, arguments: {a:b,x:y}}}}";
-        Destination dest = ssn.createTopic(str);
+        Destination dest = ssn.createQueue(str);
         MessageConsumer consumer1 = ssn.createConsumer(dest);
         try
         {
@@ -889,11 +939,12 @@
         {            
         }
     }
-    
+
+   
     public void testQueueReceiversAndTopicSubscriber() throws Exception
     {
-        Queue queue = new AMQAnyDestination("ADDR:my-queue; {create: always}");
-        Topic topic = new AMQAnyDestination("ADDR:amq.topic/test");
+        Queue queue = new AddressBasedQueue("my-queue; {create: always}");
+        Topic topic = new AddressBasedTopic("amq.topic/test");
         
         QueueSession qSession = ((AMQConnection)_connection).createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
         QueueReceiver receiver = qSession.createReceiver(queue);
@@ -917,7 +968,7 @@
         assertEquals("test2",((TextMessage)msg2).getText());  
     }
     
-    public void testDurableSubscriber() throws Exception
+    public void xtestDurableSubscriber() throws Exception
     {
         Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);        
         
@@ -977,7 +1028,7 @@
         // default (create never, assert never) -------------------
         // create never --------------------------------------------
         String addr1 = "ADDR:testQueue1;{create: always, delete: always}";
-        AMQDestination  dest = new AMQAnyDestination(addr1);
+        AddressBasedDestination  dest = getDestination(addr1);
         try
         {
             cons = jmsSession.createConsumer(dest);
@@ -989,11 +1040,11 @@
         }
         
         assertFalse("Queue not deleted as expected",(
-                (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));  
+                (AMQSession_0_10)jmsSession).isQueueExist(dest.getAddress().getName()));  
         
         
         String addr2 = "ADDR:testQueue2;{create: always, delete: receiver}";
-        dest = new AMQAnyDestination(addr2);
+        dest = getDestination(addr2);
         try
         {
             cons = jmsSession.createConsumer(dest);
@@ -1005,14 +1056,14 @@
         }
         
         assertFalse("Queue not deleted as expected",(
-                (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));  
+                (AMQSession_0_10)jmsSession).isQueueExist(dest.getAddress().getName()));  
 
         
         String addr3 = "ADDR:testQueue3;{create: always, delete: sender}";
-        dest = new AMQAnyDestination(addr3);
+        dest = getDestination(addr3);
         try
         {
-            cons = jmsSession.createConsumer(dest);
+            //cons = jmsSession.createConsumer(dest);
             MessageProducer prod = jmsSession.createProducer(dest);
             prod.close();
         }
@@ -1022,7 +1073,7 @@
         }
         
         assertFalse("Queue not deleted as expected",(
-                (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));  
+                (AMQSession_0_10)jmsSession).isQueueExist(dest.getAddress().getName()));  
 
         
     }
@@ -1061,7 +1112,9 @@
         String addr3 = "ADDR:testQueue2;{create: always, delete : receiver, link : {reliability : exactly-once}}";        
         try
         {
-            AMQAnyDestination dest = new AMQAnyDestination(addr3);
+            Destination dest = getDestination(addr3);
+            Session ssn = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
+            MessageConsumer cons = ssn.createConsumer(dest);
             fail("An exception should be thrown indicating it's an unsupported type");
         }
         catch(Exception e)
@@ -1072,14 +1125,14 @@
         String addr4 = "ADDR:amq.topic/test;{link : {reliability : at-least-once}}";        
         try
         {
-            AMQAnyDestination dest = new AMQAnyDestination(addr4);
+            Destination dest = getDestination(addr4);
             Session ssn = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
             MessageConsumer cons = ssn.createConsumer(dest);
             fail("An exception should be thrown indicating it's an unsupported combination");
         }
         catch(Exception e)
         {
-            assertTrue(e.getCause().getMessage().contains("AT-LEAST-ONCE is not yet supported for Topics"));
+            assertTrue(e.getCause().getCause().getMessage().contains("AT-LEAST-ONCE is not yet supported for Topics"));
         }
     }
     
@@ -1089,7 +1142,7 @@
         MessageConsumer cons;
         MessageProducer prod;
         
-        AMQDestination  dest = new AMQAnyDestination(address);
+        AddressBasedDestination  dest = getDestination(address);
         cons = ssn.createConsumer(dest);
         prod = ssn.createProducer(dest);
         
@@ -1119,8 +1172,8 @@
         MessageConsumer cons = ssn.createConsumer(ssn.createTopic("ADDR:amq.topic/test"));
         MessageProducer prod = ssn.createProducer(null);
         
-        Queue queue = ssn.createQueue("ADDR:amq.topic/test");
-        prod.send(queue,ssn.createTextMessage("A"));
+        Topic topic = ssn.createTopic("ADDR:amq.topic/test");
+        prod.send(topic,ssn.createTextMessage("A"));
         
         Message msg = cons.receive(1000);
         assertNotNull(msg);
@@ -1147,7 +1200,7 @@
 		Destination replyToDest = AMQDestination.createDestination(replyTo);
 	    MessageConsumer replyToCons = session.createConsumer(replyToDest);
 	    		    			
-		Destination dest = session.createQueue("ADDR:amq.direct/test");
+		Destination dest = session.createTopic("ADDR:amq.direct/test");
 					
 		MessageConsumer cons = session.createConsumer(dest);
 		MessageProducer prod = session.createProducer(dest);
diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSDestinationTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSDestinationTest.java
index a7efe49..8271481 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSDestinationTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSDestinationTest.java
@@ -20,15 +20,10 @@
  */
 package org.apache.qpid.test.client.message;
 
-import org.apache.qpid.configuration.ClientProperties;
-import org.apache.qpid.client.AMQAnyDestination;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.client.CustomJMSXProperty;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.management.common.mbeans.ManagedQueue;
-import org.apache.qpid.test.utils.JMXTestUtils;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import java.nio.BufferOverflowException;
+import java.util.Iterator;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import javax.jms.Connection;
 import javax.jms.Destination;
@@ -41,10 +36,16 @@
 import javax.jms.Topic;
 import javax.management.openmbean.CompositeDataSupport;
 import javax.management.openmbean.TabularData;
-import java.nio.BufferOverflowException;
-import java.util.Iterator;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
+
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.CustomJMSXProperty;
+import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.management.common.mbeans.ManagedQueue;
+import org.apache.qpid.test.utils.JMXTestUtils;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
 
 /**
  * From the API Docs getJMSDestination:
@@ -338,13 +339,13 @@
     public void testGetDestinationWithCustomExchange() throws Exception
     {
 
-        AMQDestination dest = new AMQAnyDestination(new AMQShortString("my-exchange"),
+        AMQDestination dest = new AMQQueue(new AMQShortString("my-exchange"),
                                                     new AMQShortString("direct"),
                                                     new AMQShortString("test"),
-                                                    false,
-                                                    false,
                                                     new AMQShortString("test"),
                                                     false,
+                                                    false,                                                    
+                                                    false,
                                                     new AMQShortString[]{new AMQShortString("test")});
         
         // to force the creation of my-exchange.
diff --git a/java/tools/src/main/java/org/apache/qpid/testkit/Receiver.java b/java/tools/src/main/java/org/apache/qpid/testkit/Receiver.java
index b4294ee..651d401 100644
--- a/java/tools/src/main/java/org/apache/qpid/testkit/Receiver.java
+++ b/java/tools/src/main/java/org/apache/qpid/testkit/Receiver.java
@@ -25,15 +25,14 @@
 import java.util.List;
 
 import javax.jms.Connection;
-import javax.jms.Destination;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.TextMessage;
 
-import org.apache.qpid.client.AMQAnyDestination;
 import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AddressBasedDestination;
 
 /**
  * A generic receiver which consumes messages
@@ -82,7 +81,7 @@
     {
     	super(con);
     	setSsn(con.createSession(isTransacted(), getAck_mode()));
-    	consumer = getSsn().createConsumer(new AMQAnyDestination(addr));
+    	consumer = getSsn().createConsumer(new AddressBasedDestination(addr));
     	if (!syncRcv)
     	{
     		consumer.setMessageListener(this);
diff --git a/java/tools/src/main/java/org/apache/qpid/testkit/Sender.java b/java/tools/src/main/java/org/apache/qpid/testkit/Sender.java
index 14b9b73..806571b 100644
--- a/java/tools/src/main/java/org/apache/qpid/testkit/Sender.java
+++ b/java/tools/src/main/java/org/apache/qpid/testkit/Sender.java
@@ -36,8 +36,8 @@
 import javax.jms.Session;
 import javax.jms.TextMessage;
 
-import org.apache.qpid.client.AMQAnyDestination;
 import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AddressBasedDestination;
 import org.apache.qpid.tools.MessageFactory;
 
 /**
@@ -95,7 +95,7 @@
        this.iterations = Integer.getInteger("iterations", -1);
        this.sleep_time = Long.getLong("sleep_time", 1000);
        this.setSsn(con.createSession(isTransacted(),Session.AUTO_ACKNOWLEDGE));
-       this.dest = new AMQAnyDestination(addr);
+       this.dest = new AddressBasedDestination(addr);
        this.producer = getSsn().createProducer(dest);
        this.replyTo = getSsn().createTemporaryQueue();
        
diff --git a/java/tools/src/main/java/org/apache/qpid/testkit/TestLauncher.java b/java/tools/src/main/java/org/apache/qpid/testkit/TestLauncher.java
index 72ca48e..2d93daa 100644
--- a/java/tools/src/main/java/org/apache/qpid/testkit/TestLauncher.java
+++ b/java/tools/src/main/java/org/apache/qpid/testkit/TestLauncher.java
@@ -29,9 +29,7 @@
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Date;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 import javax.jms.Destination;
 import javax.jms.JMSException;
@@ -44,11 +42,8 @@
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.log4j.PatternLayout;
-import org.apache.qpid.client.AMQAnyDestination;
 import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.client.AddressBasedDestination;
 import org.apache.qpid.thread.Threading;
 
 /**
@@ -144,7 +139,7 @@
             controlCon = new AMQConnection(url);
             controlCon.start();
             
-            controlDest = new AMQAnyDestination("control; {create: always}"); // durable
+            controlDest = new AddressBasedDestination("control; {create: always}"); // durable
 
             // Create the session to setup the messages
             controlSession = controlCon.createSession(false, Session.AUTO_ACKNOWLEDGE);
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java
index 121e94c..a71e6ac 100644
--- a/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java
+++ b/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java
@@ -31,11 +31,9 @@
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 
-import org.apache.qpid.client.AMQAnyDestination;
 import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.client.AMQSession_0_10;
-import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.client.AddressBasedDestination;
 import org.apache.qpid.messaging.Address;
 
 public class PerfBase
@@ -142,7 +140,7 @@
         controllerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
         dest = createDestination();
-        controllerQueue = new AMQAnyDestination(CONTROLLER_ADDR);
+        controllerQueue = new AddressBasedDestination(CONTROLLER_ADDR);
         myControlQueue = session.createQueue(myControlQueueAddr);
         msgType = MessageType.getType(params.getMessageType());
         System.out.println("Using " + msgType + " messages");
@@ -157,10 +155,10 @@
         {
             System.out.println("Prefix : " + prefix);
             Address addr = Address.parse(params.getAddress());
-            AMQAnyDestination temp = new AMQAnyDestination(params.getAddress());
-            int type = ((AMQSession_0_10)session).resolveAddressType(temp);
+            AddressBasedDestination temp = new AddressBasedDestination(addr);
+            temp.resolveAddress((AMQSession_0_10)session);
 
-            if ( type == AMQDestination.TOPIC_TYPE)
+            if (temp.isTopic())
             {
                 addr = new Address(addr.getName(),addr.getSubject() + "." + prefix,addr.getOptions());
                 System.out.println("Setting subject : " + addr);
@@ -171,11 +169,11 @@
                 System.out.println("Setting name : " + addr);
             }
 
-            return new AMQAnyDestination(addr);
+            return new AddressBasedDestination(addr);
         }
         else
         {
-            return new AMQAnyDestination(params.getAddress());
+            return new AddressBasedDestination(params.getAddress());
         }
     }