More workarounds to allow testing of both codecs side by side.
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/BenchmarkReceiver.java b/proton-j/src/main/java/org/apache/qpid/proton/BenchmarkReceiver.java
index e3aaf62..46a89a9 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/BenchmarkReceiver.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/BenchmarkReceiver.java
@@ -1,21 +1,16 @@
 package org.apache.qpid.proton;
 
-import org.apache.qpid.proton.message2.Message;
-import org.apache.qpid.proton.messenger.Messenger;
-
 public class BenchmarkReceiver
-{   
-    public static void main(String[] args) throws Exception
+{
+    static void messengerOld(int count) throws Exception
     {
-        int count = Integer.getInteger("count", 1);
-        
-        Messenger rec = Proton.messenger();
+        org.apache.qpid.proton.messenger.Messenger rec = new org.apache.qpid.proton.messenger.impl.MessengerImpl();
         rec.start();
         rec.subscribe("amqp://~localhost:5672");
         int i = 0;
-        Message m = null;
+        org.apache.qpid.proton.message.Message m = null;
         long start = 0;
-        while (i <count)
+        while (i < count)
         {
             rec.recv();
             while ((m = rec.get()) != null)
@@ -23,10 +18,48 @@
                 i++;
                 start = m.getCreationTime();
             }
-        }    
-        System.out.println("Count " + count + " i "  + i);
-        
-        double elapsed = (System.nanoTime() - start)/1000000; 
-        System.out.println(String.format("Time elapsed (end-to-end) for %s msgs : %s (milli secs)", count, elapsed));
+        }
+        System.out.println("Count " + count + " i " + i);
+
+        double elapsed = (System.nanoTime() - start) / 1000000;
+        System.out.println(String.format("Old codec Time elapsed (end-to-end) for %s msgs : %s (milli secs)", count, elapsed));
+    }
+
+    
+    static void messengerNew(int count)  throws Exception
+    {
+        org.apache.qpid.proton.messenger.Messenger2 rec = new org.apache.qpid.proton.messenger.impl.MessengerImpl2();
+        rec.start();
+        rec.subscribe("amqp://~localhost:5672");
+        int i = 0;
+        org.apache.qpid.proton.message2.Message m = null;
+        long start = 0;
+        while (i < count)
+        {
+            rec.recv();
+            while ((m = rec.get()) != null)
+            {
+                i++;
+                start = m.getCreationTime();
+            }
+        }
+
+        double elapsed = (System.nanoTime() - start) / 1000000;
+        System.out.println(String.format("New codec Time elapsed (end-to-end) for %s msgs : %s (milli secs)", count, elapsed));
+    }
+ 
+    public static void main(String[] args) throws Exception
+    {
+        int count = Integer.getInteger("count", 1);
+        boolean isNewCodec = Boolean.getBoolean("new-codec");
+
+        if (isNewCodec)
+        {
+            messengerNew(count);
+        }
+        else
+        {
+            messengerOld(count);
+        }
     }
 }
\ No newline at end of file
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/BenchmarkSender.java b/proton-j/src/main/java/org/apache/qpid/proton/BenchmarkSender.java
index a118c2d..a01296d 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/BenchmarkSender.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/BenchmarkSender.java
@@ -6,103 +6,137 @@
 import java.util.List;
 import java.util.Map;
 
-import org.apache.qpid.proton.message2.Message;
-import org.apache.qpid.proton.messenger.Messenger;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
 
 public class BenchmarkSender
 {
     static String largeString = "Knowing that millions of people around the world would be watching in person and on television and expecting great";
+
     static String smallString = "Hello world";
+
     static long n = Long.MAX_VALUE;
 
-    public static Message getMessage(int op)
+    enum OP {STR_SMALL, STR_BIG, LONG, LIST_SMALL, LIST_BIG, MAP_SMALL, MAP_BIG, NESTED_SMALL, NESTED_BIG};
+ 
+    public static Object getPayload(int op)
     {
-        Message m = Proton.message2();
-        m.setAddress("amqp://localhost:5672");
         switch (op)
         {
-        case 0 :
-            m.setBody(smallString);
-            break;
-        case 1 :
-            m.setBody(largeString);
-            break;
-        case 2 :
-            m.setBody(n);
-            break;
-        case 3 :
-           List<String> listSm = new ArrayList<String>();
-           for (int i=0; i<5; i++)
-           {
-               listSm.add(smallString);
-           } 
-           m.setBody(listSm);
-           break;
-        case 4 :
+        case 0:
+            return smallString;
+        case 1:
+            return largeString;
+        case 2:
+            return n;
+        case 3:
+            List<String> listSm = new ArrayList<String>();
+            for (int i = 0; i < 5; i++)
+            {
+                listSm.add(smallString);
+            }
+            return listSm;
+        case 4:
             List<String> listBig = new ArrayList<String>();
-            for (int i=0; i<100; i++)
+            for (int i = 0; i < 100; i++)
             {
                 listBig.add(smallString);
-            } 
-            m.setBody(listBig);
-            break;
+            }
+            return listBig;
         case 5:
-          Map<String, Integer> mapSm = new HashMap<String, Integer>(); 
-          for (int i=0; i<5; i++)
-          {
-              mapSm.put(String.valueOf(i), i);
-          }  
-          m.setBody(mapSm);
-          break;
+            Map<String, Integer> mapSm = new HashMap<String, Integer>();
+            for (int i = 0; i < 5; i++)
+            {
+                mapSm.put(String.valueOf(i), i);
+            }
+            return mapSm;
         case 6:
-            Map<String, Integer> mapBig = new HashMap<String, Integer>(); 
-            for (int i=0; i<100; i++)
+            Map<String, Integer> mapBig = new HashMap<String, Integer>();
+            for (int i = 0; i < 100; i++)
             {
                 mapBig.put(String.valueOf(i), i);
-            }  
-            m.setBody(mapBig);
+            }
+            return mapBig;
         case 7:
-            List<Integer> l1 = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
-            Map<String, List<Integer>> nestedMapSm = new HashMap<String, List<Integer>>(); 
-            for (int i=0; i<5; i++)
+            List<Integer> l1 = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+            Map<String, List<Integer>> nestedMapSm = new HashMap<String, List<Integer>>();
+            for (int i = 0; i < 5; i++)
             {
                 nestedMapSm.put(String.valueOf(i), l1);
             }
-            m.setBody(nestedMapSm);
-            break;
+            return nestedMapSm;
         case 8:
-            List<Integer> l2 = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
-            Map<String, List<Integer>> nestedMapBig = new HashMap<String, List<Integer>>(); 
-            for (int i=0; i<100; i++)
+            List<Integer> l2 = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+            Map<String, List<Integer>> nestedMapBig = new HashMap<String, List<Integer>>();
+            for (int i = 0; i < 100; i++)
             {
                 nestedMapBig.put(String.valueOf(i), l2);
             }
-            m.setBody(nestedMapBig);
-            break;
+            return nestedMapBig;
+        default:
+            return smallString;
         }
-        return m;
     }
-    
-    public static void main(String[] args) throws Exception
+
+    static void messengerOld(int count, int op) throws Exception
     {
-        int count = Integer.getInteger("count", 1);
-        int op = Integer.getInteger("op", 4);
-        
-        Message msg = getMessage(op);
-        Messenger sender = Proton.messenger();
+        Object obj = getPayload(op);
+        org.apache.qpid.proton.message.Message msg = Proton.message();
+        msg.setAddress("amqp://localhost:5672");
+        msg.setBody(new AmqpValue(obj));
+        org.apache.qpid.proton.messenger.Messenger sender = new org.apache.qpid.proton.messenger.impl.MessengerImpl();
         sender.start();
-        
+
         long start = System.nanoTime();
-        for (int i=0; i<count; i++)
+        for (int i = 0; i < count; i++)
         {
             msg.setCreationTime(start);
             sender.put(msg);
         }
         sender.send();
 
-        double elapsed = (System.nanoTime() - start)/1000000; 
-        System.out.println(String.format("Time elapsed for sending %s msgs for op [%s] : %s (milli secs)", count, op, elapsed));
-        
+        double elapsed = (System.nanoTime() - start) / 1000000;
+        System.out.println(String.format("Old codec Time elapsed for sending %s msgs for op [%s] : %s (milli secs)", count, OP.values()[op],
+                elapsed));
+    }
+
+    static void messengerNew(int count, int op) throws Exception
+    {
+        Object obj = getPayload(op);
+        org.apache.qpid.proton.message2.Message msg = Proton.message2();
+        msg.setAddress("amqp://localhost:5672");
+        msg.setBody(obj);
+        org.apache.qpid.proton.messenger.Messenger2 sender = new org.apache.qpid.proton.messenger.impl.MessengerImpl2();
+        sender.start();
+
+        long start = System.nanoTime();
+        for (int i = 0; i < count; i++)
+        {
+            msg.setCreationTime(start);
+            sender.put(msg);
+        }
+        sender.send();
+
+        double elapsed = (System.nanoTime() - start) / 1000000;
+        System.out.println(String.format("New Codec Time elapsed for sending %s msgs for op [%s] : %s (milli secs)", count, op,
+                elapsed));
+    }
+
+    public static void main(String[] args) throws Exception
+    {
+        int count = Integer.getInteger("count", 1);
+        int op = Integer.getInteger("op", 1);
+
+        boolean isNewCodec = Boolean.getBoolean("new-codec");
+
+        if (isNewCodec)
+        {
+            messengerNew(count, op);
+        }
+        else
+        {
+            messengerOld(count, op);
+        }
+
         final Object ob = new Object();
         synchronized (ob)
         {
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/TestReceiver.java b/proton-j/src/main/java/org/apache/qpid/proton/TestReceiver.java
deleted file mode 100644
index 4143caf..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/TestReceiver.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package org.apache.qpid.proton;
-import java.io.IOException;
-
-import org.apache.qpid.proton.message2.Message;
-import org.apache.qpid.proton.messenger.Messenger;
-
-
-public class TestReceiver
-{
-    public static void main(String[] args) throws IOException
-    {
-        Messenger messenger = Proton.messenger();
-        messenger.start();
-        messenger.subscribe("amqp://~localhost:5672");
-        messenger.recv(); 
-        Message m = messenger.get();
-        System.out.println("Received " + m);
-    }
-
-}
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/TestSender.java b/proton-j/src/main/java/org/apache/qpid/proton/TestSender.java
deleted file mode 100644
index f522aa8..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/TestSender.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package org.apache.qpid.proton;
-import java.io.IOException;
-
-import org.apache.qpid.proton.message2.AmqpValue;
-import org.apache.qpid.proton.message2.Message;
-import org.apache.qpid.proton.messenger.Messenger;
-
-public class TestSender
-{
-    public static void main(String[] args) throws IOException, InterruptedException
-    {
-        Messenger messenger = Proton.messenger();
-        messenger.start();
-        for (int i=0; i < 1; i++)
-        {
-            Message m = Proton.message2();
-            m.setAddress("amqp://localhost:5672");
-            m.setBody(new AmqpValue("Msg" + i));
-            messenger.put(m);
-        }
-        messenger.send();
-    }
-}
\ No newline at end of file
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/messenger/Messenger.java b/proton-j/src/main/java/org/apache/qpid/proton/messenger/Messenger.java
index cd6d93d..6d3f362 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/messenger/Messenger.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/messenger/Messenger.java
@@ -23,7 +23,7 @@
 import java.io.IOException;
 
 import org.apache.qpid.proton.TimeoutException;
-import org.apache.qpid.proton.message2.Message;
+import org.apache.qpid.proton.message.Message;
 
 import org.apache.qpid.proton.messenger.impl.MessengerImpl;
 
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/messenger/Messenger2.java b/proton-j/src/main/java/org/apache/qpid/proton/messenger/Messenger2.java
new file mode 100644
index 0000000..47ab412
--- /dev/null
+++ b/proton-j/src/main/java/org/apache/qpid/proton/messenger/Messenger2.java
@@ -0,0 +1,278 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.qpid.proton.messenger;
+
+import java.io.IOException;
+
+import org.apache.qpid.proton.TimeoutException;
+import org.apache.qpid.proton.message2.Message;
+import org.apache.qpid.proton.messenger.impl.MessengerImpl2;
+
+/**
+ *
+ *  Messenger defines a high level interface for sending and receiving
+ *  messages. Every Messenger contains a single logical queue of
+ *  incoming messages and a single logical queue of outgoing
+ *  messages. These messages in these queues may be destined for, or
+ *  originate from, a variety of addresses.
+ *
+ *  <h3>Address Syntax</h3>
+ *
+ *  An address has the following form:
+ *
+ *    [ amqp[s]:// ] [user[:password]@] domain [/[name]]
+ *
+ *  Where domain can be one of:
+ *
+ *    host | host:port | ip | ip:port | name
+ *
+ *  The following are valid examples of addresses:
+ *
+ *   - example.org
+ *   - example.org:1234
+ *   - amqp://example.org
+ *   - amqps://example.org
+ *   - example.org/incoming
+ *   - amqps://example.org/outgoing
+ *   - amqps://fred:trustno1@example.org
+ *   - 127.0.0.1:1234
+ *   - amqps://127.0.0.1:1234
+ *
+ *  <h3>Sending &amp; Receiving Messages</h3>
+ *
+ *  The Messenger interface works in conjuction with the Message
+ *  class. The Message class is a mutable holder of message content.
+ *  The put method will encode the content in a given Message object
+ *  into the outgoing message queue leaving that Message object free
+ *  to be modified or discarded without having any impact on the
+ *  content in the outgoing queue.
+ *
+ *  Similarly, the get method will decode the content in the incoming
+ *  message queue into the supplied Message object.
+*/
+public interface Messenger2
+{
+
+    public static final class Factory
+    {
+        public static Messenger2 create() {
+            return new MessengerImpl2();
+        }
+
+        public static Messenger2 create(String name) {
+            return new MessengerImpl2(name);
+        }
+    }
+
+    /**
+     * Flag for use with reject(), accept() and settle() methods.
+     */
+    static final int CUMULATIVE = 0x01;
+
+    /**
+     * Places the content contained in the message onto the outgoing
+     * queue of the Messenger. This method will never block. The
+     * send call may be used to block until the messages are
+     * sent. Either a send() or a recv() call is neceesary at present
+     * to cause the messages to actually be sent out.
+     */
+    void put(Message message) throws MessengerException;
+
+    /**
+     * Blocks until the outgoing queue is empty and, in the event that
+     * an outgoing window has been set, until the messages in that
+     * window have been received by the target to which they were
+     * sent, or the operation times out. The timeout property
+     * controls how long a Messenger will block before timing out.
+     */
+    void send() throws TimeoutException;
+
+    void send(int n) throws TimeoutException;
+
+    /**
+     * Subscribes the Messenger to messages originating from the
+     * specified source. The source is an address as specified in the
+     * Messenger introduction with the following addition. If the
+     * domain portion of the address begins with the '~' character,
+     * the Messenger will interpret the domain as host/port, bind
+     * to it, and listen for incoming messages. For example
+     * "~0.0.0.0", "amqp://~0.0.0.0" will bind to any local interface
+     * and listen for incoming messages.
+     */
+    void subscribe(String source) throws MessengerException;
+    /**
+     * Receives an arbitrary number of messages into the
+     * incoming queue of the Messenger. This method will block until
+     * at least one message is available or the operation times out.
+     */
+    void recv() throws TimeoutException;
+    /**
+     * Receives up to the specified number of messages into the
+     * incoming queue of the Messenger. This method will block until
+     * at least one message is available or the operation times out.
+     */
+    void recv(int count) throws TimeoutException;
+    /**
+     * Returns the capacity of the incoming message queue of
+     * messenger. Note this count does not include those messages
+     * already available on the incoming queue (see
+     * incoming()). Rather it returns the number of incoming queue
+     * entries available for receiving messages
+     */
+    int receiving();
+    /**
+     * Returns the message from the head of the incoming message
+     * queue.
+     */
+    Message get();
+
+    /**
+     * Transitions the Messenger to an active state. A Messenger is
+     * initially created in an inactive state. When inactive, a
+     * Messenger will not send or receive messages from its internal
+     * queues. A Messenger must be started before calling send() or
+     * recv().
+     */
+    void start() throws IOException;
+    /**
+     * Transitions the Messenger to an inactive state. An inactive
+     * Messenger will not send or receive messages from its internal
+     * queues. A Messenger should be stopped before being discarded to
+     * ensure a clean shutdown handshake occurs on any internally managed
+     * connections.
+     */
+    void stop();
+
+    boolean stopped();
+
+    /** Sends or receives any outstanding messages queued for a
+     * messenger.  If timeout is zero, no blocking is done.  A timeout
+     * of -1 blocks forever, otherwise timeout is the maximum time (in
+     * millisecs) to block.  Returns True if work was performed.
+     */
+    boolean work(long timeout) throws TimeoutException;
+
+    void interrupt();
+
+    void setTimeout(long timeInMillis);
+    long getTimeout();
+
+    boolean isBlocking();
+    void setBlocking(boolean b);
+
+    /**
+     * Returns a count of the messages currently on the outgoing queue
+     * (i.e. those that have been put() but not yet actually sent
+     * out).
+     */
+    int outgoing();
+    /**
+     * Returns a count of the messages available on the incoming
+     * queue.
+     */
+    int incoming();
+
+    int getIncomingWindow();
+    void setIncomingWindow(int window);
+
+    int getOutgoingWindow();
+    void setOutgoingWindow(int window);
+
+    /**
+     * Returns a token which can be used to accept or reject the
+     * message returned in the previous get() call.
+     */
+    Tracker incomingTracker();
+    /**
+     * Returns a token which can be used to track the status of the
+     * message of the previous put() call.
+     */
+    Tracker outgoingTracker();
+
+    /**
+     * Rejects messages retrieved from the incoming message queue. The
+     * tracker object for a message is obtained through a call to
+     * incomingTracker() following a get(). If the flags argument
+     * contains CUMULATIVE, then all message up to the one identified
+     * by the tracker will be rejected.
+     */
+    void reject(Tracker tracker, int flags);
+    /**
+     * Accepts messages retrieved from the incoming message queue. The
+     * tracker object for a message is obtained through a call to
+     * incomingTracker() following a get(). If the flags argument
+     * contains CUMULATIVE, then all message up to the one identified
+     * by the tracker will be accepted.
+     */
+    void accept(Tracker tracker, int flags);
+    void settle(Tracker tracker, int flags);
+
+    /**
+     * Gets the last known remote state of the delivery associated
+     * with the given tracker.
+     */
+    Status getStatus(Tracker tracker);
+
+    void route(String pattern, String address);
+
+    void rewrite(String pattern, String address);
+
+    /**
+     * Set the path to the certificate file.
+     */
+    void setCertificate(String certificate);
+
+    /**
+     * Get the path to the certificate file.
+     */
+    String getCertificate();
+
+    /**
+     * Set the path to private key file.
+     */
+    void setPrivateKey(String privateKey);
+
+    /**
+     * Get the path to the private key file.
+     */
+    String getPrivateKey();
+
+    /**
+     * Set the password for private key file.
+     */
+    void setPassword(String password);
+
+    /**
+     * Get the password for the priate key file.
+     */
+    String getPassword();
+
+    /**
+     * Set the path to the trusted certificate database.
+     */
+    void setTrustedCertificates(String trusted);
+
+    /**
+     * Get the path to the trusted certificate database.
+     */
+    String getTrustedCertificates();
+
+}
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
index f0d7119..e6475b9 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
@@ -45,7 +45,7 @@
 import org.apache.qpid.proton.engine.SslDomain;
 import org.apache.qpid.proton.engine.Ssl;
 import org.apache.qpid.proton.engine.Transport;
-import org.apache.qpid.proton.message2.Message;
+import org.apache.qpid.proton.message.Message;
 import org.apache.qpid.proton.messenger.Messenger;
 import org.apache.qpid.proton.messenger.MessengerException;
 import org.apache.qpid.proton.messenger.Status;
@@ -461,7 +461,7 @@
         StoreEntry entry = _incomingStore.get( null );
         if (entry != null)
         {
-            Message message = Proton.message2();
+            Message message = Proton.message();
             message.decode( entry.getEncodedMsg(), 0, entry.getEncodedLength() );
 
             _incomingTracker = new TrackerImpl(TrackerImpl.Type.INCOMING,
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl2.java b/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl2.java
new file mode 100644
index 0000000..ef861bf
--- /dev/null
+++ b/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl2.java
@@ -0,0 +1,1547 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.qpid.proton.messenger.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.InterruptException;
+import org.apache.qpid.proton.TimeoutException;
+import org.apache.qpid.proton.driver.Connector;
+import org.apache.qpid.proton.driver.Driver;
+import org.apache.qpid.proton.driver.Listener;
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.engine.Sasl;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.engine.Session;
+import org.apache.qpid.proton.engine.SslDomain;
+import org.apache.qpid.proton.engine.Ssl;
+import org.apache.qpid.proton.engine.Transport;
+import org.apache.qpid.proton.message2.Message;
+import org.apache.qpid.proton.messenger.Messenger2;
+import org.apache.qpid.proton.messenger.MessengerException;
+import org.apache.qpid.proton.messenger.Status;
+import org.apache.qpid.proton.messenger.Tracker;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.messaging.Target;
+import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+
+public class MessengerImpl2 implements Messenger2
+{
+    private enum LinkCreditMode
+    {
+        // method for replenishing credit
+        LINK_CREDIT_EXPLICIT,   // recv(N)
+        LINK_CREDIT_AUTO;       // recv()
+    }
+
+    private static final EnumSet<EndpointState> UNINIT = EnumSet.of(EndpointState.UNINITIALIZED);
+    private static final EnumSet<EndpointState> ACTIVE = EnumSet.of(EndpointState.ACTIVE);
+    private static final EnumSet<EndpointState> CLOSED = EnumSet.of(EndpointState.CLOSED);
+    private static final EnumSet<EndpointState> ANY = EnumSet.allOf(EndpointState.class);
+
+    private final Logger _logger = Logger.getLogger("proton.messenger");
+    private final String _name;
+    private long _timeout = -1;
+    private boolean _blocking = true;
+    private long _nextTag = 1;
+    private Driver _driver;
+    private LinkCreditMode _credit_mode = LinkCreditMode.LINK_CREDIT_EXPLICIT;
+    private final int _credit_batch = 1024;   // credit_mode == LINK_CREDIT_AUTO
+    private int _credit;        // available
+    private int _distributed;    // outstanding credit
+    private int _receivers;      // total # receiver Links
+    private int _draining;       // # Links in drain state
+    private List<Receiver> _credited = new ArrayList<Receiver>();
+    private List<Receiver> _blocked = new ArrayList<Receiver>();
+    private long _next_drain;
+    private TrackerImpl _incomingTracker;
+    private TrackerImpl _outgoingTracker;
+    private Store _incomingStore = new Store();
+    private Store _outgoingStore = new Store();
+    private List<Connector> _awaitingDestruction = new ArrayList<Connector>();
+    private int _sendThreshold;
+
+    private Transform _routes = new Transform();
+    private Transform _rewrites = new Transform();
+
+    private String _certificate;
+    private String _privateKey;
+    private String _password;
+    private String _trustedDb;
+
+
+    /**
+     * @deprecated This constructor's visibility will be reduced to the default scope in a future release.
+     * Client code outside this module should use a {@link MessengerFactory} instead
+     */
+    @Deprecated public MessengerImpl2()
+    {
+        this(java.util.UUID.randomUUID().toString());
+    }
+
+    /**
+     * @deprecated This constructor's visibility will be reduced to the default scope in a future release.
+     * Client code outside this module should use a {@link MessengerFactory} instead
+     */
+    @Deprecated public MessengerImpl2(String name)
+    {
+        _name = name;
+    }
+
+    public void setTimeout(long timeInMillis)
+    {
+        _timeout = timeInMillis;
+    }
+
+    public long getTimeout()
+    {
+        return _timeout;
+    }
+
+    public boolean isBlocking()
+    {
+        return _blocking;
+    }
+
+    public void setBlocking(boolean b)
+    {
+        _blocking = b;
+    }
+
+    public void setCertificate(String certificate)
+    {
+        _certificate = certificate;
+    }
+
+    public String getCertificate()
+    {
+        return _certificate;
+    }
+
+    public void setPrivateKey(String privateKey)
+    {
+        _privateKey = privateKey;
+    }
+
+    public String getPrivateKey()
+    {
+        return _privateKey;
+    }
+
+    public void setPassword(String password)
+    {
+        _password = password;
+    }
+
+    public String getPassword()
+    {
+        return _password;
+    }
+
+    public void setTrustedCertificates(String trusted)
+    {
+        _trustedDb = trusted;
+    }
+
+    public String getTrustedCertificates()
+    {
+        return _trustedDb;
+    }
+
+    public void start() throws IOException
+    {
+        _driver = Proton.driver();
+    }
+
+    public void stop()
+    {
+        if (_driver != null) {
+            if(_logger.isLoggable(Level.FINE))
+            {
+                _logger.fine(this + " about to stop");
+            }
+            //close all connections
+            for (Connector<?> c : _driver.connectors())
+            {
+                Connection connection = c.getConnection();
+                connection.close();
+            }
+            //stop listeners
+            for (Listener<?> l : _driver.listeners())
+            {
+                try
+                {
+                    l.close();
+                }
+                catch (IOException e)
+                {
+                    _logger.log(Level.WARNING, "Error while closing listener", e);
+                }
+            }
+            waitUntil(_allClosed);
+        }
+    }
+
+    public boolean stopped()
+    {
+        return _allClosed.test();
+    }
+
+    public boolean work(long timeout) throws TimeoutException
+    {
+        if (_driver == null) { return false; }
+        _worked = false;
+        return waitUntil(_workPred, timeout);
+    }
+
+    public void interrupt()
+    {
+        if (_driver != null) {
+            _driver.wakeup();
+        }
+    }
+
+    private String defaultRewrite(String address) {
+        if (address != null && address.contains("@")) {
+            Address addr = new Address(address);
+            String scheme = addr.getScheme();
+            String host = addr.getHost();
+            String port = addr.getPort();
+            String name = addr.getName();
+
+            StringBuilder sb = new StringBuilder();
+            if (scheme != null) {
+                sb.append(scheme).append("://");
+            }
+            if (host != null) {
+                sb.append(host);
+            }
+            if (port != null) {
+                sb.append(":").append(port);
+            }
+            if (name != null) {
+                sb.append("/").append(name);
+            }
+            return sb.toString();
+        } else {
+            return address;
+        }
+    }
+
+
+    private String _original;
+
+    private void rewriteMessage(Message m)
+    {
+        _original = m.getAddress();
+        if (_rewrites.apply(_original)) {
+            m.setAddress(_rewrites.result());
+        } else {
+            m.setAddress(defaultRewrite(_original));
+        }
+    }
+
+    private void restoreMessage(Message m)
+    {
+        m.setAddress(_original);
+    }
+
+    private String routeAddress(String addr)
+    {
+        if (_routes.apply(addr)) {
+            return _routes.result();
+        } else {
+            return addr;
+        }
+    }
+
+    public void put(Message m) throws MessengerException
+    {
+        if (_driver == null) {
+            throw new IllegalStateException("cannot put while messenger is stopped");
+        }
+
+        if(_logger.isLoggable(Level.FINE))
+        {
+            _logger.fine(this + " about to put message: " + m);
+        }
+
+        StoreEntry entry = _outgoingStore.put( m.getAddress() );
+        _outgoingTracker = new TrackerImpl(TrackerImpl.Type.OUTGOING,
+                                           _outgoingStore.trackEntry(entry));
+
+        String routedAddress = routeAddress(m.getAddress());
+        Address address = new Address(routedAddress);
+        if (address.getHost() == null)
+        {
+            throw new MessengerException("unable to send to address: " + routedAddress);
+        }
+
+        rewriteMessage(m);
+
+        try {
+            adjustReplyTo(m);
+
+            int encoded;
+            byte[] buffer = new byte[5*1024];
+            while (true)
+            {
+                try
+                {
+                    encoded = m.encode(buffer, 0, buffer.length);
+                    break;
+                } catch (java.nio.BufferOverflowException e) {
+                    buffer = new byte[buffer.length*2];
+                }
+            }
+            entry.setEncodedMsg( buffer, encoded );
+        }
+        finally
+        {
+            restoreMessage(m);
+        }
+
+        Sender sender = getLink(address, new SenderFinder(address.getName()));
+        pumpOut(m.getAddress(), sender);
+    }
+
+    private void reclaimLink(Link link)
+    {
+        if (link instanceof Receiver)
+        {
+            int credit = link.getCredit();
+            if (credit > 0)
+            {
+                _credit += credit;
+                _distributed -= credit;
+            }
+        }
+
+        Delivery delivery = link.head();
+        while (delivery != null)
+        {
+            StoreEntry entry = (StoreEntry) delivery.getContext();
+            if (entry != null)
+            {
+                entry.setDelivery(null);
+                if (delivery.isBuffered()) {
+                    entry.setStatus(Status.ABORTED);
+                }
+            }
+            delivery = delivery.next();
+        }
+        linkRemoved(link);
+    }
+
+    private int pumpOut( String address, Sender sender )
+    {
+        StoreEntry entry = _outgoingStore.get( address );
+        if (entry == null) {
+            sender.drained();
+            return 0;
+        }
+
+        byte[] tag = String.valueOf(_nextTag++).getBytes();
+        Delivery delivery = sender.delivery(tag);
+        entry.setDelivery( delivery );
+        _logger.log(Level.FINE, "Sending on delivery: " + delivery);
+        int n = sender.send( entry.getEncodedMsg(), 0, entry.getEncodedLength());
+        if (n < 0) {
+            _outgoingStore.freeEntry( entry );
+            _logger.log(Level.WARNING, "Send error: " + n);
+            return n;
+        } else {
+            sender.advance();
+            _outgoingStore.freeEntry( entry );
+            return 0;
+        }
+    }
+
+    public void send() throws TimeoutException
+    {
+        send(-1);
+    }
+
+    public void send(int n) throws TimeoutException
+    {
+        if (_driver == null) {
+            throw new IllegalStateException("cannot send while messenger is stopped");
+        }
+
+        if(_logger.isLoggable(Level.FINE))
+        {
+            _logger.fine(this + " about to send");
+        }
+
+        if (n == -1)
+            _sendThreshold = 0;
+        else
+        {
+            _sendThreshold = outgoing() - n;
+            if (_sendThreshold < 0)
+                _sendThreshold = 0;
+        }
+
+        waitUntil(_sentSettled);
+    }
+
+    public void recv(int n) throws TimeoutException
+    {
+        if (_driver == null) {
+            throw new IllegalStateException("cannot recv while messenger is stopped");
+        }
+
+        if (_logger.isLoggable(Level.FINE) && n != -1)
+        {
+            _logger.fine(this + " about to wait for up to " + n + " messages to be received");
+        }
+
+        if (n == -1)
+        {
+            _credit_mode = LinkCreditMode.LINK_CREDIT_AUTO;
+        }
+        else
+        {
+            _credit_mode = LinkCreditMode.LINK_CREDIT_EXPLICIT;
+            if (n > _distributed)
+                _credit = n - _distributed;
+            else        // cancel unallocated
+                _credit = 0;
+        }
+
+        distributeCredit();
+
+        waitUntil(_messageAvailable);
+    }
+
+    public void recv() throws TimeoutException
+    {
+        recv(-1);
+    }
+
+    public int receiving()
+    {
+        return _credit + _distributed;
+    }
+
+    public Message get()
+    {
+        StoreEntry entry = _incomingStore.get( null );
+        if (entry != null)
+        {
+            Message message = Proton.message2();
+            message.decode( entry.getEncodedMsg(), 0, entry.getEncodedLength() );
+
+            _incomingTracker = new TrackerImpl(TrackerImpl.Type.INCOMING,
+                                               _incomingStore.trackEntry(entry));
+
+            _incomingStore.freeEntry( entry );
+            return message;
+        }
+        return null;
+    }
+
+    private int pumpIn(String address, Receiver receiver)
+    {
+        Delivery delivery = receiver.current();
+        if (delivery.isReadable() && !delivery.isPartial())
+        {
+            StoreEntry entry = _incomingStore.put( address );
+            entry.setDelivery( delivery );
+
+            _logger.log(Level.FINE, "Readable delivery found: " + delivery);
+
+            int size = delivery.pending();
+            byte[] buffer = new byte[size];
+            int read = receiver.recv( buffer, 0, buffer.length );
+            if (read != size) {
+                throw new IllegalStateException();
+            }
+            entry.setEncodedMsg( buffer, size );
+            receiver.advance();
+
+            // account for the used credit, replenish if
+            // low (< 20% maximum per-link batch) and
+            // extra credit available
+            assert(_distributed > 0);
+            _distributed--;
+            if (!receiver.getDrain() && _blocked.isEmpty() && _credit > 0)
+            {
+                final int max = perLinkCredit();
+                final int lo_thresh = (int)(max * 0.2 + 0.5);
+                if (receiver.getRemoteCredit() < lo_thresh)
+                {
+                    final int more = Math.min(_credit, max - receiver.getRemoteCredit());
+                    _credit -= more;
+                    _distributed += more;
+                    receiver.flow(more);
+                }
+            }
+            // check if blocked
+            if (receiver.getRemoteCredit() == 0 && _credited.contains(receiver))
+            {
+                _credited.remove(receiver);
+                if (receiver.getDrain())
+                {
+                    receiver.setDrain(false);
+                    assert( _draining > 0 );
+                    _draining--;
+                }
+                _blocked.add(receiver);
+            }
+        }
+        return 0;
+    }
+
+    public void subscribe(String source) throws MessengerException
+    {
+        if (_driver == null) {
+            throw new IllegalStateException("messenger is stopped");
+        }
+
+        String routed = routeAddress(source);
+        Address address = new Address(routed);
+
+        String hostName = address.getHost();
+        if (hostName == null) throw new MessengerException("Invalid address (hostname cannot be null): " + routed);
+        int port = Integer.valueOf(address.getImpliedPort());
+        if (address.isPassive())
+        {
+            if(_logger.isLoggable(Level.FINE))
+            {
+                _logger.fine(this + " about to subscribe to source " + source + " using address " + hostName + ":" + port);
+            }
+            ListenerContext ctx = new ListenerContext(address);
+            _driver.createListener(hostName, port, ctx);
+        }
+        else
+        {
+            if(_logger.isLoggable(Level.FINE))
+            {
+                _logger.fine(this + " about to subscribe to source " + source);
+            }
+            getLink(address, new ReceiverFinder(address.getName()));
+        }
+    }
+
+    public int outgoing()
+    {
+        return _outgoingStore.size() + queued(true);
+    }
+
+    public int incoming()
+    {
+        return _incomingStore.size() + queued(false);
+    }
+
+    public int getIncomingWindow()
+    {
+        return _incomingStore.getWindow();
+    }
+
+    public void setIncomingWindow(int window)
+    {
+        _incomingStore.setWindow(window);
+    }
+
+    public int getOutgoingWindow()
+    {
+        return _outgoingStore.getWindow();
+    }
+
+    public void setOutgoingWindow(int window)
+    {
+        _outgoingStore.setWindow(window);
+    }
+
+    public Tracker incomingTracker()
+    {
+        return _incomingTracker;
+    }
+    public Tracker outgoingTracker()
+    {
+        return _outgoingTracker;
+    }
+
+    private Store getTrackerStore(Tracker tracker)
+    {
+        return ((TrackerImpl)tracker).isOutgoing() ? _outgoingStore : _incomingStore;
+    }
+
+    @Override
+    public void reject(Tracker tracker, int flags)
+    {
+        int id = ((TrackerImpl)tracker).getSequence();
+        getTrackerStore(tracker).update(id, Status.REJECTED, flags, false, false);
+    }
+
+    @Override
+    public void accept(Tracker tracker, int flags)
+    {
+        int id = ((TrackerImpl)tracker).getSequence();
+        getTrackerStore(tracker).update(id, Status.ACCEPTED, flags, false, false);
+    }
+
+    @Override
+    public void settle(Tracker tracker, int flags)
+    {
+        int id = ((TrackerImpl)tracker).getSequence();
+        getTrackerStore(tracker).update(id, Status.UNKNOWN, flags, true, true);
+    }
+
+    public Status getStatus(Tracker tracker)
+    {
+        int id = ((TrackerImpl)tracker).getSequence();
+        StoreEntry e = getTrackerStore(tracker).getEntry(id);
+        if (e != null)
+        {
+            return e.getStatus();
+        }
+        return Status.UNKNOWN;
+    }
+
+    @Override
+    public void route(String pattern, String address)
+    {
+        _routes.rule(pattern, address);
+    }
+
+    @Override
+    public void rewrite(String pattern, String address)
+    {
+        _rewrites.rule(pattern, address);
+    }
+
+    private int queued(boolean outgoing)
+    {
+        int count = 0;
+        if (_driver != null) {
+            for (Connector<?> c : _driver.connectors())
+            {
+                Connection connection = c.getConnection();
+                for (Link link : new Links(connection, ACTIVE, ANY))
+                {
+                    if (outgoing)
+                    {
+                        if (link instanceof Sender) count += link.getQueued();
+                    }
+                    else
+                    {
+                        if (link instanceof Receiver) count += link.getQueued();
+                    }
+                }
+            }
+        }
+        return count;
+    }
+
+    private void bringDestruction()
+    {
+        for (Connector<?> c : _awaitingDestruction)
+        {
+            c.destroy();
+        }
+        _awaitingDestruction.clear();
+    }
+
+    private void processAllConnectors()
+    {
+        distributeCredit();
+        for (Connector<?> c : _driver.connectors())
+        {
+            processEndpoints(c);
+            try
+            {
+                if (c.process()) {
+                    _worked = true;
+                }
+            }
+            catch (IOException e)
+            {
+                _logger.log(Level.SEVERE, "Error processing connection", e);
+            }
+        }
+        bringDestruction();
+        distributeCredit();
+    }
+
+    private void processActive()
+    {
+        //process active listeners
+        for (Listener<?> l = _driver.listener(); l != null; l = _driver.listener())
+        {
+            _worked = true;
+            Connector<?> c = l.accept();
+            Connection connection = Proton.connection();
+            connection.setContainer(_name);
+            ListenerContext ctx = (ListenerContext) l.getContext();
+            connection.setContext(new ConnectionContext(ctx.getAddress(), c));
+            c.setConnection(connection);
+            Transport transport = c.getTransport();
+            //TODO: full SASL
+            Sasl sasl = c.sasl();
+            if (sasl != null)
+            {
+                sasl.server();
+                sasl.setMechanisms(new String[]{"ANONYMOUS"});
+                sasl.done(Sasl.SaslOutcome.PN_SASL_OK);
+            }
+            transport.ssl(ctx.getDomain());
+            connection.open();
+        }
+        // process connectors, reclaiming credit on closed connectors
+        for (Connector<?> c = _driver.connector(); c != null; c = _driver.connector())
+        {
+            _worked = true;
+            if (c.isClosed())
+            {
+                _awaitingDestruction.add(c);
+                reclaimCredit(c.getConnection());
+            }
+            else
+            {
+                _logger.log(Level.FINE, "Processing active connector " + c);
+                try
+                {
+                    c.process();
+                    processEndpoints(c);
+                    c.process();
+                }
+                catch (IOException e)
+                {
+                    _logger.log(Level.SEVERE, "Error processing connection", e);
+                }
+            }
+        }
+        bringDestruction();
+        distributeCredit();
+    }
+
+    private void processEndpoints(Connector c)
+    {
+        Connection connection = c.getConnection();
+
+        if (connection.getLocalState() == EndpointState.UNINITIALIZED)
+        {
+            connection.open();
+        }
+
+        Delivery delivery = connection.getWorkHead();
+        while (delivery != null)
+        {
+            Link link = delivery.getLink();
+            if (delivery.isUpdated())
+            {
+                if (link instanceof Sender)
+                {
+                    delivery.disposition(delivery.getRemoteState());
+                }
+                StoreEntry e = (StoreEntry) delivery.getContext();
+                if (e != null) e.updated();
+            }
+
+            if (delivery.isReadable())
+            {
+                pumpIn( link.getSource().getAddress(), (Receiver)link );
+            }
+
+            Delivery next = delivery.getWorkNext();
+            delivery.clear();
+            delivery = next;
+        }
+
+        for (Session session : new Sessions(connection, UNINIT, ANY))
+        {
+            session.open();
+            _logger.log(Level.FINE, "Opened session " + session);
+        }
+        for (Link link : new Links(connection, UNINIT, ANY))
+        {
+            //TODO: the following is not correct; should only copy those properties that we understand
+            link.setSource(link.getRemoteSource());
+            link.setTarget(link.getRemoteTarget());
+            linkAdded(link);
+            link.open();
+            _logger.log(Level.FINE, "Opened link " + link);
+        }
+
+        distributeCredit();
+
+        for (Link link : new Links(connection, ACTIVE, ACTIVE))
+        {
+            if (link instanceof Sender)
+            {
+                pumpOut(link.getTarget().getAddress(), (Sender)link);
+            }
+        }
+
+        for (Session session : new Sessions(connection, ACTIVE, CLOSED))
+        {
+            session.close();
+        }
+
+        for (Link link : new Links(connection, ANY, CLOSED))
+        {
+            if (link.getLocalState() == EndpointState.ACTIVE)
+            {
+                link.close();
+            }
+            else
+            {
+                reclaimLink(link);
+            }
+        }
+
+        if (connection.getRemoteState() == EndpointState.CLOSED)
+        {
+            if (connection.getLocalState() == EndpointState.ACTIVE)
+            {
+                connection.close();
+            }
+        }
+    }
+
+    private boolean waitUntil(Predicate condition) throws TimeoutException
+    {
+        if (_blocking) {
+            boolean done = waitUntil(condition, _timeout);
+            if (!done) {
+                _logger.log(Level.SEVERE, String.format
+                            ("Timeout when waiting for condition %s after %s ms",
+                             condition, _timeout));
+                throw new TimeoutException();
+            }
+            return done;
+        } else {
+            return waitUntil(condition, 0);
+        }
+    }
+
+    private boolean waitUntil(Predicate condition, long timeout)
+    {
+        if (_driver == null) {
+            throw new IllegalStateException("cannot wait while messenger is stopped");
+        }
+
+        processAllConnectors();
+
+        // wait until timeout expires or until test is true
+        long now = System.currentTimeMillis();
+        final long deadline = timeout < 0 ? Long.MAX_VALUE : now + timeout;
+        boolean done = false;
+
+        while (true)
+        {
+            done = condition.test();
+            if (done) break;
+
+            long remaining;
+            if (timeout < 0)
+                remaining = -1;
+            else {
+                remaining = deadline - now;
+                if (remaining < 0) break;
+            }
+
+            // Update the credit scheduler. If the scheduler detects
+            // credit imbalance on the links, wake up in time to
+            // service credit drain
+            distributeCredit();
+            if (_next_drain != 0)
+            {
+                long wakeup = (_next_drain > now) ? _next_drain - now : 0;
+                remaining = (remaining == -1) ? wakeup : Math.min(remaining, wakeup);
+            }
+
+            boolean woken;
+            woken = _driver.doWait(remaining);
+            processActive();
+            if (woken) {
+                throw new InterruptException();
+            }
+            now = System.currentTimeMillis();
+        }
+
+        return done;
+    }
+
+    private Connection lookup(Address address)
+    {
+        for (Connector<?> c : _driver.connectors())
+        {
+            Connection connection = c.getConnection();
+            ConnectionContext ctx = (ConnectionContext) connection.getContext();
+            if (ctx.matches(address))
+            {
+                return connection;
+            }
+        }
+        return null;
+    }
+
+    private void reclaimCredit(Connection connection)
+    {
+        for (Link link : new Links(connection, ANY, ANY))
+        {
+            reclaimLink(link);
+        }
+    }
+
+    private void distributeCredit()
+    {
+        if (_receivers == 0) return;
+
+        if (_credit_mode == LinkCreditMode.LINK_CREDIT_AUTO)
+        {
+            // replenish, but limit the max total messages buffered
+            final int max = _receivers * _credit_batch;
+            final int used = _distributed + incoming();
+            if (max > used)
+                _credit = max - used;
+        }
+
+        // reclaim any credit left over after draining links has completed
+        if (_draining > 0)
+        {
+            Iterator<Receiver> itr = _credited.iterator();
+            while (itr.hasNext())
+            {
+                Receiver link = (Receiver) itr.next();
+                if (link.getDrain())
+                {
+                    if (!link.draining())
+                    {
+                        // drain completed for this link
+                        int drained = link.drained();
+                        assert(_distributed >= drained);
+                        _distributed -= drained;
+                        _credit += drained;
+                        link.setDrain(false);
+                        _draining--;
+                        itr.remove();
+                        _blocked.add(link);
+                    }
+                }
+            }
+        }
+
+        // distribute available credit to blocked links
+        final int batch = perLinkCredit();
+        while (_credit > 0 && !_blocked.isEmpty())
+        {
+            Receiver link = _blocked.get(0);
+            _blocked.remove(0);
+
+            final int more = Math.min(_credit, batch);
+            _distributed += more;
+            _credit -= more;
+
+            link.flow(more);
+            _credited.add(link);
+
+            // flow changed, must process it
+            ConnectionContext ctx = (ConnectionContext) link.getSession().getConnection().getContext();
+            try
+            {
+                ctx.getConnector().process();
+            } catch (IOException e) {
+                _logger.log(Level.SEVERE, "Error processing connection", e);
+            }
+        }
+
+        if (_blocked.isEmpty())
+        {
+            _next_drain = 0;
+        }
+        else
+        {
+            // not enough credit for all links - start draining granted credit
+            if (_draining == 0)
+            {
+                // don't do it too often - pace ourselves (it's expensive)
+                if (_next_drain == 0)
+                {
+                    _next_drain = System.currentTimeMillis() + 250;
+                }
+                else if (_next_drain <= System.currentTimeMillis())
+                {
+                    // initiate drain, free up at most enough to satisfy blocked
+                    _next_drain = 0;
+                    int needed = _blocked.size() * batch;
+
+                    for (Receiver link : _credited)
+                    {
+                        if (!link.getDrain()) {
+                            link.setDrain(true);
+                            needed -= link.getRemoteCredit();
+                            _draining++;
+                            // drain requested on link, must process it
+                            ConnectionContext ctx = (ConnectionContext) link.getSession().getConnection().getContext();
+                            try
+                            {
+                                ctx.getConnector().process();
+                            } catch (IOException e) {
+                                _logger.log(Level.SEVERE, "Error processing connection", e);
+                            }
+                            if (needed <= 0) break;
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    private interface Predicate
+    {
+        boolean test();
+    }
+
+    private class SentSettled implements Predicate
+    {
+        public boolean test()
+        {
+            //are all sent messages settled?
+            int total = _outgoingStore.size();
+
+            for (Connector<?> c : _driver.connectors())
+            {
+                // TBD
+                // check if transport is done generating output
+                // pn_transport_t *transport = pn_connector_transport(ctor);
+                // if (transport) {
+                //    if (!pn_transport_quiesced(transport)) {
+                //        pn_connector_process(ctor);
+                //        return false;
+                //    }
+                // }
+
+                Connection connection = c.getConnection();
+                for (Link link : new Links(connection, ACTIVE, ANY))
+                {
+                    if (link instanceof Sender)
+                    {
+                        total += link.getQueued();
+                    }
+                }
+
+                // TBD: there is no per-link unsettled
+                // deliveries iterator, so for now get the
+                // deliveries by walking the outgoing trackers
+                Iterator<StoreEntry> entries = _outgoingStore.trackedEntries();
+                while (entries.hasNext() && total <= _sendThreshold)
+                {
+                    StoreEntry e = (StoreEntry) entries.next();
+                    if (e != null )
+                    {
+                        Delivery d = e.getDelivery();
+                        if (d != null)
+                        {
+                            if (d.getRemoteState() == null && !d.remotelySettled())
+                            {
+                                total++;
+                            }
+                        }
+                    }
+                }
+            }
+            return total <= _sendThreshold;
+        }
+    }
+
+    private class MessageAvailable implements Predicate
+    {
+        public boolean test()
+        {
+            //do we have at least one pending message?
+            if (_incomingStore.size() > 0) return true;
+            for (Connector<?> c : _driver.connectors())
+            {
+                Connection connection = c.getConnection();
+                Delivery delivery = connection.getWorkHead();
+                while (delivery != null)
+                {
+                    if (delivery.isReadable() && !delivery.isPartial())
+                    {
+                        return true;
+                    }
+                    else
+                    {
+                        delivery = delivery.getWorkNext();
+                    }
+                }
+            }
+            // if no connections, or not listening, exit as there won't ever be a message
+            if (!_driver.listeners().iterator().hasNext() && !_driver.connectors().iterator().hasNext())
+                return true;
+
+            return false;
+        }
+    }
+
+    private class AllClosed implements Predicate
+    {
+        public boolean test()
+        {
+            if (_driver == null) {
+                return true;
+            }
+
+            for (Connector<?> c : _driver.connectors()) {
+                if (!c.isClosed()) {
+                    return false;
+                }
+            }
+
+            _driver.destroy();
+            _driver = null;
+
+            return true;
+        }
+    }
+
+    private boolean _worked = false;
+
+    private class WorkPred implements Predicate
+    {
+        public boolean test()
+        {
+            return _worked;
+        }
+    }
+
+    private final SentSettled _sentSettled = new SentSettled();
+    private final MessageAvailable _messageAvailable = new MessageAvailable();
+    private final AllClosed _allClosed = new AllClosed();
+    private final WorkPred _workPred = new WorkPred();
+
+    private interface LinkFinder<C extends Link>
+    {
+        C test(Link link);
+        C create(Session session);
+    }
+
+    private class SenderFinder implements LinkFinder<Sender>
+    {
+        private final String _path;
+
+        SenderFinder(String path)
+        {
+            _path = path == null ? "" : path;
+        }
+
+        public Sender test(Link link)
+        {
+            if (link instanceof Sender && matchTarget((Target) link.getTarget(), _path))
+            {
+                return (Sender) link;
+            }
+            else
+            {
+                return null;
+            }
+        }
+
+        public Sender create(Session session)
+        {
+            Sender sender = session.sender(_path);
+            Target target = new Target();
+            target.setAddress(_path);
+            sender.setTarget(target);
+            // the C implemenation does this:
+            Source source = new Source();
+            source.setAddress(_path);
+            sender.setSource(source);
+            if (getOutgoingWindow() > 0)
+            {
+                // use explicit settlement via dispositions (not pre-settled)
+                sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+                sender.setReceiverSettleMode(ReceiverSettleMode.SECOND);  // desired
+            }
+            return sender;
+        }
+    }
+
+    private class ReceiverFinder implements LinkFinder<Receiver>
+    {
+        private final String _path;
+
+        ReceiverFinder(String path)
+        {
+            _path = path == null ? "" : path;
+        }
+
+        public Receiver test(Link link)
+        {
+            if (link instanceof Receiver && matchSource((Source) link.getSource(), _path))
+            {
+                return (Receiver) link;
+            }
+            else
+            {
+                return null;
+            }
+        }
+
+        public Receiver create(Session session)
+        {
+            Receiver receiver = session.receiver(_path);
+            Source source = new Source();
+            source.setAddress(_path);
+            receiver.setSource(source);
+            // the C implemenation does this:
+            Target target = new Target();
+            target.setAddress(_path);
+            receiver.setTarget(target);
+            if (getIncomingWindow() > 0)
+            {
+                // use explicit settlement via dispositions (not pre-settled)
+                receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);  // desired
+                receiver.setReceiverSettleMode(ReceiverSettleMode.SECOND);
+            }
+            return receiver;
+        }
+    }
+
+    private <C extends Link> C getLink(Address address, LinkFinder<C> finder)
+    {
+        Connection connection = lookup(address);
+        if (connection == null)
+        {
+            String host = address.getHost();
+            int port = Integer.valueOf(address.getImpliedPort());
+            Connector<?> connector = _driver.createConnector(host, port, null);
+            _logger.log(Level.FINE, "Connecting to " + host + ":" + port);
+            connection = Proton.connection();
+            connection.setContainer(_name);
+            connection.setHostname(host);
+            connection.setContext(new ConnectionContext(address, connector));
+            connector.setConnection(connection);
+            Sasl sasl = connector.sasl();
+            if (sasl != null)
+            {
+                sasl.client();
+                sasl.setMechanisms(new String[]{"ANONYMOUS"});
+            }
+            if ("amqps".equalsIgnoreCase(address.getScheme())) {
+                Transport transport = connector.getTransport();
+                SslDomain domain = makeDomain(address, SslDomain.Mode.CLIENT);
+                if (_trustedDb != null) {
+                    domain.setPeerAuthentication(SslDomain.VerifyMode.VERIFY_PEER);
+                    //domain.setPeerAuthentication(SslDomain.VerifyMode.VERIFY_PEER_NAME);
+                } else {
+                    domain.setPeerAuthentication(SslDomain.VerifyMode.ANONYMOUS_PEER);
+                }
+                Ssl ssl = transport.ssl(domain);
+                //ssl.setPeerHostname(host);
+            }
+            connection.open();
+        }
+
+        for (Link link : new Links(connection, ACTIVE, ANY))
+        {
+            C result = finder.test(link);
+            if (result != null) return result;
+        }
+        Session session = connection.session();
+        session.open();
+        C link = finder.create(session);
+        linkAdded(link);
+        link.open();
+        return link;
+    }
+
+    private static class Links implements Iterable<Link>
+    {
+        private final Connection _connection;
+        private final EnumSet<EndpointState> _local;
+        private final EnumSet<EndpointState> _remote;
+
+        Links(Connection connection, EnumSet<EndpointState> local, EnumSet<EndpointState> remote)
+        {
+            _connection = connection;
+            _local = local;
+            _remote = remote;
+        }
+
+        public java.util.Iterator<Link> iterator()
+        {
+            return new LinkIterator(_connection, _local, _remote);
+        }
+    }
+
+    private static class LinkIterator implements java.util.Iterator<Link>
+    {
+        private final EnumSet<EndpointState> _local;
+        private final EnumSet<EndpointState> _remote;
+        private Link _next;
+
+        LinkIterator(Connection connection, EnumSet<EndpointState> local, EnumSet<EndpointState> remote)
+        {
+            _local = local;
+            _remote = remote;
+            _next = connection.linkHead(_local, _remote);
+        }
+
+        public boolean hasNext()
+        {
+            return _next != null;
+        }
+
+        public Link next()
+        {
+            try
+            {
+                return _next;
+            }
+            finally
+            {
+                _next = _next.next(_local, _remote);
+            }
+        }
+
+        public void remove()
+        {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    private static class Sessions implements Iterable<Session>
+    {
+        private final Connection _connection;
+        private final EnumSet<EndpointState> _local;
+        private final EnumSet<EndpointState> _remote;
+
+        Sessions(Connection connection, EnumSet<EndpointState> local, EnumSet<EndpointState> remote)
+        {
+            _connection = connection;
+            _local = local;
+            _remote = remote;
+        }
+
+        public java.util.Iterator<Session> iterator()
+        {
+            return new SessionIterator(_connection, _local, _remote);
+        }
+    }
+
+    private static class SessionIterator implements java.util.Iterator<Session>
+    {
+        private final EnumSet<EndpointState> _local;
+        private final EnumSet<EndpointState> _remote;
+        private Session _next;
+
+        SessionIterator(Connection connection, EnumSet<EndpointState> local, EnumSet<EndpointState> remote)
+        {
+            _local = local;
+            _remote = remote;
+            _next = connection.sessionHead(_local, _remote);
+        }
+
+        public boolean hasNext()
+        {
+            return _next != null;
+        }
+
+        public Session next()
+        {
+            try
+            {
+                return _next;
+            }
+            finally
+            {
+                _next = _next.next(_local, _remote);
+            }
+        }
+
+        public void remove()
+        {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    private void adjustReplyTo(Message m)
+    {
+        String original = m.getReplyTo();
+        if (original != null) {
+            if (original.startsWith("~/"))
+            {
+                m.setReplyTo("amqp://" + _name + "/" + original.substring(2));
+            }
+            else if (original.equals("~"))
+            {
+                m.setReplyTo("amqp://" + _name);
+            }
+        }
+    }
+
+    private static boolean matchTarget(Target target, String path)
+    {
+        if (target == null) return path.isEmpty();
+        else return path.equals(target.getAddress());
+    }
+
+    private static boolean matchSource(Source source, String path)
+    {
+        if (source == null) return path.isEmpty();
+        else return path.equals(source.getAddress());
+    }
+
+    @Override
+    public String toString()
+    {
+        StringBuilder builder = new StringBuilder();
+        builder.append("MessengerImpl [_name=").append(_name).append("]");
+        return builder.toString();
+    }
+
+    // compute the maximum amount of credit each receiving link is
+    // entitled to.  The actual credit given to the link depends on
+    // what amount of credit is actually available.
+    private int perLinkCredit()
+    {
+        if (_receivers == 0) return 0;
+        int total = _credit + _distributed;
+        return Math.max(total/_receivers, 1);
+    }
+
+    // a new link has been created, account for it.
+    private void linkAdded(Link link)
+    {
+        if (link instanceof Receiver)
+        {
+            _receivers++;
+            _blocked.add((Receiver)link);
+            link.setContext(Boolean.TRUE);
+        }
+    }
+
+    // a link is being removed, account for it.
+    private void linkRemoved(Link _link)
+    {
+        if (_link instanceof Receiver && (Boolean) _link.getContext())
+        {
+            _link.setContext(Boolean.FALSE);
+            Receiver link = (Receiver)_link;
+            assert _receivers > 0;
+            _receivers--;
+            if (link.getDrain())
+            {
+                link.setDrain(false);
+                assert _draining > 0;
+                _draining--;
+            }
+            if (_blocked.contains(link))
+                _blocked.remove(link);
+            else if (_credited.contains(link))
+                _credited.remove(link);
+            else
+                assert(false);
+        }
+    }
+
+    private class ConnectionContext
+    {
+        private Address _address;
+        private Connector _connector;
+
+        public ConnectionContext(Address address, Connector connector)
+        {
+            _address = address;
+            _connector = connector;
+        }
+
+        public Address getAddress()
+        {
+            return _address;
+        }
+
+        public boolean matches(Address address)
+        {
+            String host = address.getHost();
+            String port = address.getImpliedPort();
+            Connection conn = _connector.getConnection();
+            return host.equals(conn.getRemoteContainer()) ||
+                (_address.getHost().equals(host) && _address.getImpliedPort().equals(port));
+        }
+
+        public Connector getConnector()
+        {
+            return _connector;
+        }
+    }
+
+    private SslDomain makeDomain(Address address, SslDomain.Mode mode)
+    {
+        SslDomain domain = Proton.sslDomain();
+        domain.init(mode);
+        if (_certificate != null) {
+            domain.setCredentials(_certificate, _privateKey, _password);
+        }
+        if (_trustedDb != null) {
+            domain.setTrustedCaDb(_trustedDb);
+        }
+
+        if ("amqps".equalsIgnoreCase(address.getScheme())) {
+            domain.allowUnsecuredClient(false);
+        } else {
+            domain.allowUnsecuredClient(true);
+        }
+
+        return domain;
+    }
+
+
+    private class ListenerContext
+    {
+        private Address _address;
+        private SslDomain _domain;
+
+        public ListenerContext(Address address)
+        {
+            _address = address;
+            _domain = makeDomain(address, SslDomain.Mode.SERVER);
+        }
+
+        public SslDomain getDomain()
+        {
+            return _domain;
+        }
+
+        public Address getAddress()
+        {
+            return _address;
+        }
+
+    }
+}