Merge from trunk

git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-network-refactor@825292 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/qpid/cpp/CMakeLists.txt b/qpid/cpp/CMakeLists.txt
index 18a7616..b15a60c 100644
--- a/qpid/cpp/CMakeLists.txt
+++ b/qpid/cpp/CMakeLists.txt
@@ -101,6 +101,27 @@
        "Directory to load client plug-in modules from")
   set (QPIDD_MODULE_DIR plugins/broker CACHE STRING
        "Directory to load broker plug-in modules from")
+
+  # The WCF/C++ client is built separately (it doesn't have a CMakeLists.txt)
+  # but installed with the C++ components on Windows.
+  # Don't freak out if it's not there (but it may be good to freak out if
+  # building the real one...)
+  install (PROGRAMS
+           ../wcf/src/Apache/Qpid/Channel/bin/Debug/Apache.Qpid.Channel.dll
+           ../wcf/src/Apache/Qpid/Channel/bin/Debug/Apache.Qpid.Interop.dll
+           DESTINATION ${QPID_INSTALL_LIBDIR}
+           COMPONENT ${QPID_COMPONENT_CLIENT}
+           OPTIONAL)
+# Not sure about this syntax yet... or how to only do it if Client is installed.
+#  set (CPACK_NSIS_EXTRA_INSTALL_COMMANDS "
+#       gacutil -I '$INSTDIR\\${QPID_INSTALL_LIBDIR}\\Apache.Qpid.Channel.dll'
+#       gacutil -I '$INSTDIR\\${QPID_INSTALL_LIBDIR}\\Apache.Qpid.Interop.dll'
+#      ")
+#  set (CPACK_NSIS_EXTRA_UNINSTALL_COMMANDS "
+#       gacutil /u 'Apache.Qpid.Channel'
+#       gacutil /u 'Apache.Qpid.Interop'
+#      ")
+
 endif (WIN32)
 if (CMAKE_SYSTEM_NAME STREQUAL Linux)
   # Set up install locations. Since the Linux install puts some files in
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt
index 6deacbb..0c8606c 100644
--- a/qpid/cpp/src/CMakeLists.txt
+++ b/qpid/cpp/src/CMakeLists.txt
@@ -333,6 +333,7 @@
     qpid/sys/windows/PollableCondition.cpp
     qpid/sys/windows/Shlib.cpp
     qpid/sys/windows/Socket.cpp
+    qpid/sys/windows/SocketAddress.cpp
     qpid/sys/windows/StrError.cpp
     qpid/sys/windows/SystemInfo.cpp
     qpid/sys/windows/Thread.cpp
@@ -387,6 +388,7 @@
     qpid/sys/posix/Shlib.cpp
     qpid/log/posix/SinkOptions.cpp
     qpid/sys/posix/Socket.cpp
+    qpid/sys/posix/SocketAddress.cpp
     qpid/sys/posix/StrError.cpp
     qpid/sys/posix/SystemInfo.cpp
     qpid/sys/posix/Thread.cpp
@@ -738,7 +740,7 @@
 if (MSVC)
   set_target_properties (qpidbroker PROPERTIES COMPILE_FLAGS /wd4290)
 endif (MSVC)
-install (TARGETS qpidbroker LIBRARY
+install (TARGETS qpidbroker
          DESTINATION ${QPID_INSTALL_LIBDIR}
          COMPONENT ${QPID_COMPONENT_BROKER})
 
@@ -772,7 +774,7 @@
 target_link_libraries (qmf qmfengine)
 set_target_properties (qmf PROPERTIES
                        VERSION ${qmf_version})
-install (TARGETS qmf LIBRARY
+install (TARGETS qmf OPTIONAL
          DESTINATION ${QPID_INSTALL_LIBDIR}
          COMPONENT ${QPID_COMPONENT_QMF})
 
@@ -806,7 +808,7 @@
 target_link_libraries (qmfengine qpidclient)
 set_target_properties (qmfengine PROPERTIES
                        VERSION ${qmfengine_version})
-install (TARGETS qmfengine
+install (TARGETS qmfengine OPTIONAL
          DESTINATION ${QPID_INSTALL_LIBDIR}
          COMPONENT ${QPID_COMPONENT_QMF})
 
diff --git a/qpid/cpp/src/posix/QpiddBroker.cpp b/qpid/cpp/src/posix/QpiddBroker.cpp
index aa93457..3a20087 100644
--- a/qpid/cpp/src/posix/QpiddBroker.cpp
+++ b/qpid/cpp/src/posix/QpiddBroker.cpp
@@ -144,8 +144,16 @@
             return 1;
         if (myOptions->daemon.check)
             cout << pid << endl;
-        if (myOptions->daemon.quit && kill(pid, SIGINT) < 0)
-          throw Exception("Failed to stop daemon: " + qpid::sys::strError(errno));
+        if (myOptions->daemon.quit) {
+            if (kill(pid, SIGINT) < 0) 
+                throw Exception("Failed to stop daemon: " + qpid::sys::strError(errno));
+            // Wait for the process to die before returning
+            int retry=10000;    // Try up to 10 seconds
+            while (kill(pid,0) == 0 && --retry)
+                sys::usleep(1000);
+            if (retry == 0)
+                throw Exception("Gave up waiting for daemon process to exit");
+        }
         return 0;
     }
 
diff --git a/qpid/cpp/src/qpid/sys/Socket.h b/qpid/cpp/src/qpid/sys/Socket.h
index d108402..76b993f 100644
--- a/qpid/cpp/src/qpid/sys/Socket.h
+++ b/qpid/cpp/src/qpid/sys/Socket.h
@@ -39,12 +39,9 @@
     /** Create a socket wrapper for descriptor. */
     QPID_COMMON_EXTERN Socket();
 
-    /** Create an initialized TCP socket */
-    void createTcp() const;
-    
     /** Set timeout for read and write */
     void setTimeout(const Duration& interval) const;
-    
+
     /** Set socket non blocking */
     void setNonblocking() const;
 
@@ -59,7 +56,8 @@
      *@return The bound port.
      */
     QPID_COMMON_EXTERN int listen(uint16_t port = 0, int backlog = 10) const;
-    
+    QPID_COMMON_EXTERN int listen(const SocketAddress&, int backlog = 10) const;
+
     /** Returns the "socket name" ie the address bound to 
      * the near end of the socket
      */
@@ -102,8 +100,12 @@
     QPID_COMMON_EXTERN void setTcpNoDelay(bool nodelay) const;
 
 private:
+    /** Create socket */
+    void createSocket(const SocketAddress&) const;
+
     Socket(IOHandlePrivate*);
     mutable std::string connectname;
+    mutable bool nonblocking;
 };
 
 }}
diff --git a/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp b/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
index 9fd0602..fd9a4b3 100644
--- a/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
+++ b/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
@@ -60,19 +60,23 @@
         DELETED
     };
 
-    int fd;
     ::__uint32_t events;
+    const IOHandlePrivate* ioHandle;
     PollerHandle* pollerHandle;
     FDStat stat;
     Mutex lock;
 
-    PollerHandlePrivate(int f, PollerHandle* p) :
-      fd(f),
+    PollerHandlePrivate(const IOHandlePrivate* h, PollerHandle* p) :
       events(0),
+      ioHandle(h),
       pollerHandle(p),
       stat(ABSENT) {
     }
 
+    int fd() const {
+        return toFd(ioHandle);
+    }
+
     bool isActive() const {
         return stat == MONITORED || stat == MONITORED_HUNGUP;
     }
@@ -131,7 +135,7 @@
 };
 
 PollerHandle::PollerHandle(const IOHandle& h) :
-    impl(new PollerHandlePrivate(toFd(h.impl), this))
+    impl(new PollerHandlePrivate(h.impl, this))
 {}
 
 PollerHandle::~PollerHandle() {
@@ -303,7 +307,7 @@
     epe.data.u64 = 0; // Keep valgrind happy
     epe.data.ptr = &eh;
 
-    QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_ADD, eh.fd, &epe));
+    QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_ADD, eh.fd(), &epe));
 
     eh.setActive();
 }
@@ -313,7 +317,7 @@
     ScopedLock<Mutex> l(eh.lock);
     assert(!eh.isIdle());
 
-    int rc = ::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, eh.fd, 0);
+    int rc = ::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, eh.fd(), 0);
     // Ignore EBADF since deleting a nonexistent fd has the overall required result!
     // And allows the case where a sloppy program closes the fd and then does the delFd()
     if (rc == -1 && errno != EBADF) {
@@ -344,7 +348,7 @@
         epe.data.u64 = 0; // Keep valgrind happy
         epe.data.ptr = &eh;
 
-        QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
+        QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, eh.fd(), &epe));
 
         eh.setActive();
         return;
@@ -382,7 +386,7 @@
     epe.data.u64 = 0; // Keep valgrind happy
     epe.data.ptr = &eh;
 
-    QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
+    QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd(), &epe));
 }
 
 void Poller::unmonitorHandle(PollerHandle& handle, Direction dir) {
@@ -408,7 +412,7 @@
     epe.data.u64 = 0; // Keep valgrind happy
     epe.data.ptr = &eh;
 
-    QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
+    QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd(), &epe));
 }
 
 void Poller::shutdown() {
@@ -443,7 +447,7 @@
         epe.events = 0;
         epe.data.u64 = 0; // Keep valgrind happy
         epe.data.ptr = &eh;
-        QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
+        QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd(), &epe));
 
         if (eh.isInactive()) {
             eh.setInterrupted();
diff --git a/qpid/cpp/src/qpid/sys/posix/Socket.cpp b/qpid/cpp/src/qpid/sys/posix/Socket.cpp
index 02004b1..481aa6c 100644
--- a/qpid/cpp/src/qpid/sys/posix/Socket.cpp
+++ b/qpid/cpp/src/qpid/sys/posix/Socket.cpp
@@ -97,22 +97,30 @@
 }
 
 Socket::Socket() :
-	IOHandle(new IOHandlePrivate)
-{
-	createTcp();
-}
-
-Socket::Socket(IOHandlePrivate* h) :
-	IOHandle(h)
+    IOHandle(new IOHandlePrivate),
+    nonblocking(false)
 {}
 
-void Socket::createTcp() const
+Socket::Socket(IOHandlePrivate* h) :
+    IOHandle(h),
+    nonblocking(false)
+{}
+
+void Socket::createSocket(const SocketAddress& sa) const
 {
     int& socket = impl->fd;
     if (socket != -1) Socket::close();
-    int s = ::socket (AF_INET, SOCK_STREAM, 0);
+    int s = ::socket(getAddrInfo(sa).ai_family, getAddrInfo(sa).ai_socktype, 0);
     if (s < 0) throw QPID_POSIX_ERROR(errno);
     socket = s;
+
+    try {
+        if (nonblocking) setNonblocking();
+    } catch (std::exception&) {
+        ::close(s);
+        socket = -1;
+        throw;
+    }
 }
 
 void Socket::setTimeout(const Duration& interval) const
@@ -125,7 +133,9 @@
 }
 
 void Socket::setNonblocking() const {
-    QPID_POSIX_CHECK(::fcntl(impl->fd, F_SETFL, O_NONBLOCK));
+    int& socket = impl->fd;
+    if (socket != -1) QPID_POSIX_CHECK(::fcntl(socket, F_SETFL, O_NONBLOCK));
+    nonblocking = true;
 }
 
 void Socket::connect(const std::string& host, uint16_t port) const
@@ -138,8 +148,9 @@
 {
     connectname = addr.asString();
 
-    const int& socket = impl->fd;
+    createSocket(addr);
 
+    const int& socket = impl->fd;
     // TODO the correct thing to do here is loop on failure until you've used all the returned addresses
     if ((::connect(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) < 0) &&
         (errno != EINPROGRESS)) {
@@ -158,15 +169,22 @@
 
 int Socket::listen(uint16_t port, int backlog) const
 {
+    SocketAddress sa("", boost::lexical_cast<std::string>(port));
+
+    createSocket(sa);
+    return listen(sa, backlog);
+}
+
+int Socket::listen(const SocketAddress& sa, int backlog) const
+{
     const int& socket = impl->fd;
     int yes=1;
     QPID_POSIX_CHECK(setsockopt(socket,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(yes)));
 
-    SocketAddress sa("", boost::lexical_cast<std::string>(port));
     if (::bind(socket, getAddrInfo(sa).ai_addr, getAddrInfo(sa).ai_addrlen) < 0)
-        throw Exception(QPID_MSG("Can't bind to port " << port << ": " << strError(errno)));
+        throw Exception(QPID_MSG("Can't bind to port " << sa.asString() << ": " << strError(errno)));
     if (::listen(socket, backlog) < 0)
-        throw Exception(QPID_MSG("Can't listen on port " << port << ": " << strError(errno)));
+        throw Exception(QPID_MSG("Can't listen on port " << sa.asString() << ": " << strError(errno)));
 
     struct sockaddr_in name;
     socklen_t namelen = sizeof(name);
diff --git a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
index 475b186..971f0bb 100644
--- a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
+++ b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
@@ -216,7 +216,7 @@
         connCallback(socket);
     } catch(std::exception& e) {
         if (failCallback)
-            failCallback(-1, std::string(e.what()));
+            failCallback(socket, -1, std::string(e.what()));
         socket.close();
         delete &socket;
     }
diff --git a/qpid/cpp/src/qpid/sys/windows/Socket.cpp b/qpid/cpp/src/qpid/sys/windows/Socket.cpp
index 18fa7c3..8e6233b 100755
--- a/qpid/cpp/src/qpid/sys/windows/Socket.cpp
+++ b/qpid/cpp/src/qpid/sys/windows/Socket.cpp
@@ -20,19 +20,18 @@
  */
 
 #include "qpid/sys/Socket.h"
+#include "qpid/sys/SocketAddress.h"
 #include "qpid/sys/windows/IoHandlePrivate.h"
 #include "qpid/sys/windows/check.h"
 #include "qpid/sys/Time.h"
 
 #include <cstdlib>
 #include <string.h>
-#include <iostream>
-#include <memory.h>
 
 #include <winsock2.h>
-#include <ws2tcpip.h>
 
 #include <boost/format.hpp>
+#include <boost/lexical_cast.hpp>
 
 // Need to initialize WinSock. Ideally, this would be a singleton or embedded
 // in some one-time initialization function. I tried boost singleton and could
@@ -138,20 +137,36 @@
 Socket::Socket() :
 	IOHandle(new IOHandlePrivate)
 {
-	createTcp();
+    SOCKET& socket = impl->fd;
+    if (socket != INVALID_SOCKET) Socket::close();
+    SOCKET s = ::socket (PF_INET, SOCK_STREAM, 0);
+    if (s == INVALID_SOCKET) throw QPID_WINDOWS_ERROR(WSAGetLastError());
+    socket = s;
 }
 
 Socket::Socket(IOHandlePrivate* h) :
 	IOHandle(h)
 {}
 
-void Socket::createTcp() const
+void
+Socket::createSocket(const SocketAddress& sa) const
 {
     SOCKET& socket = impl->fd;
     if (socket != INVALID_SOCKET) Socket::close();
-    SOCKET s = ::socket (PF_INET, SOCK_STREAM, 0);
+
+    SOCKET s = ::socket (getAddrInfo(sa).ai_family,
+                         getAddrInfo(sa).ai_socktype,
+                         0);
     if (s == INVALID_SOCKET) throw QPID_WINDOWS_ERROR(WSAGetLastError());
     socket = s;
+
+    try {
+        if (nonblocking) setNonblocking();
+    } catch (std::exception&) {
+        closesocket(s);
+        socket = INVALID_SOCKET;
+        throw;
+    }
 }
 
 void Socket::setTimeout(const Duration& interval) const
@@ -175,41 +190,26 @@
 
 void Socket::connect(const std::string& host, uint16_t port) const
 {
-    std::stringstream portstream;
-    portstream << port << std::ends;
-    std::string portstr = portstream.str();
-    std::stringstream namestream;
-    namestream << host << ":" << port;
-    connectname = namestream.str();
+    SocketAddress sa(host, boost::lexical_cast<std::string>(port));
+    connect(sa);
+}
 
+void
+Socket::connect(const SocketAddress& addr) const
+{
     const SOCKET& socket = impl->fd;
-    // TODO: Be good to make this work for IPv6 as well as IPv4. Would require
-    // other changes, such as waiting to create the socket until after we
-    // have the address family. Maybe unbundle the translation of names here;
-    // use TcpAddress to resolve things and make this class take a TcpAddress
-    // and grab its address family to create the socket.
-    struct addrinfo hints;
-    memset(&hints, 0, sizeof(hints));
-    hints.ai_family = AF_INET;   // We always creating AF_INET-only sockets.
-    hints.ai_socktype = SOCK_STREAM; // We always do TCP
-    addrinfo *addrs;
-    int status = getaddrinfo(host.c_str(), portstr.c_str(), &hints, &addrs);
-    if (status != 0)
-        throw Exception(QPID_MSG("Cannot resolve " << host << ": " <<
-                                 gai_strerror(status)));
-    addrinfo *addr = addrs;
+    const addrinfo *addrs = &(getAddrInfo(addr));
     int error = 0;
     WSASetLastError(0);
-    while (addr != 0) {
-        if ((::connect(socket, addr->ai_addr, addr->ai_addrlen) == 0) ||
+    while (addrs != 0) {
+        if ((::connect(socket, addrs->ai_addr, addrs->ai_addrlen) == 0) ||
             (WSAGetLastError() == WSAEWOULDBLOCK))
             break;
         // Error... save this error code and see if there are other address
         // to try before throwing the exception.
         error = WSAGetLastError();
-        addr = addr->ai_next;
+        addrs = addrs->ai_next;
     }
-    freeaddrinfo(addrs);
     if (error)
         throw qpid::Exception(QPID_MSG(strError(error) << ": " << connectname));
 }
diff --git a/qpid/cpp/src/qpid/sys/windows/SocketAddress.cpp b/qpid/cpp/src/qpid/sys/windows/SocketAddress.cpp
new file mode 100644
index 0000000..a3e03c9
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/SocketAddress.cpp
@@ -0,0 +1,70 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/SocketAddress.h"
+
+#include "qpid/sys/windows/check.h"
+
+#include <ws2tcpip.h>
+#include <string.h>
+
+namespace qpid {
+namespace sys {
+
+SocketAddress::SocketAddress(const std::string& host0, const std::string& port0) :
+    host(host0),
+    port(port0),
+    addrInfo(0)
+{
+    ::addrinfo hints;
+    ::memset(&hints, 0, sizeof(hints));
+    hints.ai_family = AF_INET; // In order to allow AF_INET6 we'd have to change createTcp() as well
+    hints.ai_socktype = SOCK_STREAM;
+
+    const char* node = 0;
+    if (host.empty()) {
+        hints.ai_flags |= AI_PASSIVE;
+    } else {
+        node = host.c_str();
+    }
+    const char* service = port.empty() ? "0" : port.c_str();
+
+    int n = ::getaddrinfo(node, service, &hints, &addrInfo);
+    if (n != 0)
+        throw Exception(QPID_MSG("Cannot resolve " << host << ": " << ::gai_strerror(n)));
+}
+
+SocketAddress::~SocketAddress()
+{
+    ::freeaddrinfo(addrInfo);
+}
+
+std::string SocketAddress::asString() const
+{
+    return host + ":" + port;
+}
+
+const ::addrinfo& getAddrInfo(const SocketAddress& sa)
+{
+    return *sa.addrInfo;
+}
+
+}}
diff --git a/qpid/java/common.xml b/qpid/java/common.xml
index 3393be7..6b9c961 100644
--- a/qpid/java/common.xml
+++ b/qpid/java/common.xml
@@ -51,6 +51,9 @@
   <property name="tasks.classes"         location="${tasks}/classes"/>
   <property name="tasks.src"             location="${tasks}/src"/>
 
+  <property name="qpid.home"             location="${project.root}/build"/>
+  <property name="qpid.work"             location="${qpid.home}/work"/>
+
   <property name="javac.compiler.args"   value=""/>
 
   <property name="cobertura.dir" value="${project.root}/lib/cobertura" />
diff --git a/qpid/java/module.xml b/qpid/java/module.xml
index 5796af9..9fcc8de 100644
--- a/qpid/java/module.xml
+++ b/qpid/java/module.xml
@@ -287,9 +287,9 @@
       </syspropertyset>
       <sysproperty key="max_prefetch" value ="${max_prefetch}"/>
       <sysproperty key="example.plugin.target" value="${project.root}/build/lib/plugins"/>
-      <sysproperty key="QPID_EXAMPLE_HOME" value="${project.root}/build"/>
-      <sysproperty key="QPID_HOME" value="${project.root}/build"/>
-      <sysproperty key="QPID_WORK" value="${project.root}/build/work"/>
+      <sysproperty key="QPID_EXAMPLE_HOME" value="${qpid.home}"/>
+      <sysproperty key="QPID_HOME" value="${qpid.home}"/>
+      <sysproperty key="QPID_WORK" value="${qpid.work}"/>
 
       <formatter type="plain"/>
       <formatter type="xml"/>
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java
index f1a1c1a..e507ebc 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java
@@ -179,9 +179,16 @@
                      messages.remove(0).getIntProperty("count"),
                      received.getIntProperty("count"));
 
-        // Allow ack to be sent to broker, by performing a synchronous command
-        // along the session.
-//        _session.createConsumer(_session.createTemporaryQueue()).close();
+        // When the Exception is received by the underlying IO layer it will
+        // initiate failover. The first step of which is to ensure that the
+        // existing conection is closed. So in this situation the connection
+        // will be flushed casuing the above ACK to be sent to the broker.
+        //
+        // That said:
+        // when the socket close is detected on the server it will rise up the
+        // Mina filter chain and interrupt processing.
+        // this has been raised as QPID-2138
+        _session.createConsumer(_session.createTemporaryQueue()).close();
 
         //Retain IO Layer
         AMQProtocolSession protocolSession = _connection.getProtocolHandler().getProtocolSession();
@@ -260,8 +267,14 @@
     private void initialiseConnection()
             throws Exception
     {
-        //Create Connection
-        _connection = (AMQConnection) getConnection();
+        //Create Connection using the default connection URL. i.e. not the Failover URL that would be used by default
+        _connection = (AMQConnection) getConnection(getConnectionFactory("default").getConnectionURL());
+        // The default connection does not have any retries configured so
+        // Allow this connection to retry so that we can block on the failover.
+        // The alternative would be to use the getConnection() default. However,
+        // this would add additional complexity in the logging as a second
+        // broker is defined in that url. We do not need it for this test.
+        _connection.getFailoverPolicy().getCurrentMethod().setRetries(1);
         _connection.setConnectionListener(this);
 
         _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java
index 3e5470d..7a7d7f6 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java
@@ -147,10 +147,23 @@
         
         setUpACLTest();
         
+        //QPID-2081: use a latch to sync on exception causing connection close, to work 
+        //around the connection close race during tearDown() causing sporadic failures
+        final CountDownLatch exceptionReceived = new CountDownLatch(1);
+        
         try
         {
             Connection conn = getConnection("guest", "guest");
 
+            conn.setExceptionListener(new ExceptionListener()
+            {
+                public void onException(JMSException e)
+                {
+                    exceptionReceived.countDown();
+                }
+            });
+
+            
             Session sesh = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
             conn.start();
@@ -166,6 +179,11 @@
             assertNotNull("There was no liked exception", cause);
             assertEquals("Wrong linked exception type", AMQAuthenticationException.class, cause.getClass());
             assertEquals("Incorrect error code received", 403, ((AMQAuthenticationException) cause).getErrorCode().getCode());
+        
+            //use the latch to ensure the control thread waits long enough for the exception thread 
+            //to have done enough to mark the connection closed before teardown commences
+            assertTrue("Timed out waiting for conneciton to report close",
+            		exceptionReceived.await(2, TimeUnit.SECONDS));
         }
     }
 
@@ -195,6 +213,10 @@
     {
     	setUpACLTest();
     	
+    	//QPID-2081: use a latch to sync on exception causing connection close, to work 
+    	//around the connection close race during tearDown() causing sporadic failures
+    	final CountDownLatch exceptionReceived = new CountDownLatch(1);
+    	
         try
         {
             Connection conn = getConnection("client", "guest");
@@ -202,6 +224,14 @@
             //Prevent Failover
             ((AMQConnection) conn).setConnectionListener(this);
 
+            conn.setExceptionListener(new ExceptionListener()
+            {
+                public void onException(JMSException e)
+                {
+                    exceptionReceived.countDown();
+                }
+            });
+            
             Session sesh = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
             conn.start();
@@ -217,6 +247,11 @@
             assertNotNull("There was no liked exception", cause);
             assertEquals("Wrong linked exception type", AMQAuthenticationException.class, cause.getClass());
             assertEquals("Incorrect error code received", 403, ((AMQAuthenticationException) cause).getErrorCode().getCode());
+        
+            //use the latch to ensure the control thread waits long enough for the exception thread 
+            //to have done enough to mark the connection closed before teardown commences
+            assertTrue("Timed out waiting for conneciton to report close",
+            		exceptionReceived.await(2, TimeUnit.SECONDS));
         }
     }
 
@@ -248,6 +283,10 @@
     {
     	setUpACLTest();
     	
+        //QPID-2081: use a latch to sync on exception causing connection close, to work 
+        //around the connection close race during tearDown() causing sporadic failures
+    	final CountDownLatch exceptionReceived = new CountDownLatch(1);
+    	
         try
         {
             Connection conn = getConnection("client", "guest");
@@ -256,6 +295,14 @@
 
             conn.start();
 
+            conn.setExceptionListener(new ExceptionListener()
+            {
+                public void onException(JMSException e)
+                {
+                    exceptionReceived.countDown();
+                }
+            });
+            
             //Create a Named Queue
             ((AMQSession) sesh).createQueue(new AMQShortString("IllegalQueue"), false, false, false);
 
@@ -266,6 +313,11 @@
         {
             amqe.printStackTrace();
             assertEquals("Incorrect error code thrown", 403, ((AMQAuthenticationException) amqe).getErrorCode().getCode());
+        
+            //use the latch to ensure the control thread waits long enough for the exception thread 
+            //to have done enough to mark the connection closed before teardown commences
+            assertTrue("Timed out waiting for conneciton to report close",
+            		exceptionReceived.await(2, TimeUnit.SECONDS));
         }
     }
 
@@ -334,11 +386,23 @@
     {
     	setUpACLTest();
     	
+        //QPID-2081: use a latch to sync on exception causing connection close, to work 
+        //around the connection close race during tearDown() causing sporadic failures
+    	final CountDownLatch exceptionReceived = new CountDownLatch(1);
+    	
         try
         {
             Connection conn = getConnection("client", "guest");
 
             ((AMQConnection) conn).setConnectionListener(this);
+            
+            conn.setExceptionListener(new ExceptionListener()
+            {
+                public void onException(JMSException e)
+                {
+                    exceptionReceived.countDown();
+                }
+            });
 
             Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
 
@@ -370,6 +434,11 @@
                     foundCorrectException = true;
                 }
             }
+            
+            //use the latch to ensure the control thread waits long enough for the exception thread 
+            //to have done enough to mark the connection closed before teardown commences
+            assertTrue("Timed out waiting for conneciton to report close",
+            		exceptionReceived.await(2, TimeUnit.SECONDS));
         }
         assertTrue("Did not get AMQAuthenticationException thrown", foundCorrectException);
     }
@@ -400,10 +469,22 @@
     {
     	setUpACLTest();
     	
+        //QPID-2081: use a latch to sync on exception causing connection close, to work 
+        //around the connection close race during tearDown() causing sporadic failures
+    	final CountDownLatch exceptionReceived = new CountDownLatch(1);
+    	
         try
         {
             Connection conn = getConnection("client", "guest");
 
+            conn.setExceptionListener(new ExceptionListener()
+            {
+                public void onException(JMSException e)
+                {
+                    exceptionReceived.countDown();
+                }
+            });
+            
             Session sesh = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
             conn.start();
@@ -420,6 +501,11 @@
             assertNotNull("There was no liked exception", cause);
             assertEquals("Wrong linked exception type", AMQAuthenticationException.class, cause.getClass());
             assertEquals("Incorrect error code received", 403, ((AMQAuthenticationException) cause).getErrorCode().getCode());
+ 
+            //use the latch to ensure the control thread waits long enough for the exception thread 
+            //to have done enough to mark the connection closed before teardown commences
+            assertTrue("Timed out waiting for conneciton to report close",
+            		exceptionReceived.await(2, TimeUnit.SECONDS));
         }
     }
 
@@ -427,10 +513,22 @@
     {
     	setUpACLTest();
     	
+        //QPID-2081: use a latch to sync on exception causing connection close, to work 
+        //around the connection close race during tearDown() causing sporadic failures
+    	final CountDownLatch exceptionReceived = new CountDownLatch(1);
+    	
         try
         {
             Connection conn = getConnection("server", "guest");
 
+            conn.setExceptionListener(new ExceptionListener()
+            {
+                public void onException(JMSException e)
+                {
+                    exceptionReceived.countDown();
+                }
+            });
+            
             Session sesh = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
             conn.start();
@@ -446,6 +544,11 @@
             assertNotNull("There was no liked exception", cause);
             assertEquals("Wrong linked exception type", AMQAuthenticationException.class, cause.getClass());
             assertEquals("Incorrect error code received", 403, ((AMQAuthenticationException) cause).getErrorCode().getCode());
+        
+            //use the latch to ensure the control thread waits long enough for the exception thread 
+            //to have done enough to mark the connection closed before teardown commences
+            assertTrue("Timed out waiting for conneciton to report close",
+            		exceptionReceived.await(2, TimeUnit.SECONDS));
         }
     }
 
@@ -487,10 +590,22 @@
     {
     	setUpACLTest();
     	
+        //QPID-2081: use a latch to sync on exception causing connection close, to work 
+        //around the connection close race during tearDown() causing sporadic failures
+    	final CountDownLatch exceptionReceived = new CountDownLatch(1);
+    	
         try
         {
             Connection conn = getConnection("server", "guest");
 
+            conn.setExceptionListener(new ExceptionListener()
+            {
+                public void onException(JMSException e)
+                {
+                    exceptionReceived.countDown();
+                }
+            });
+            
             Session sesh = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
             conn.start();
@@ -504,6 +619,11 @@
         catch (AMQAuthenticationException amqe)
         {
             assertEquals("Incorrect error code thrown", 403, amqe.getErrorCode().getCode());
+            
+            //use the latch to ensure the control thread waits long enough for the exception thread 
+            //to have done enough to mark the connection closed before teardown commences
+            assertTrue("Timed out waiting for conneciton to report close",
+            		exceptionReceived.await(2, TimeUnit.SECONDS));
         }
     }
 
@@ -511,10 +631,22 @@
     {
     	setUpACLTest();
     	
+        //QPID-2081: use a latch to sync on exception causing connection close, to work 
+        //around the connection close race during tearDown() causing sporadic failures
+    	final CountDownLatch exceptionReceived = new CountDownLatch(1);
+
         try
         {
             Connection conn = getConnection("server", "guest");
 
+            conn.setExceptionListener(new ExceptionListener()
+            {
+                public void onException(JMSException e)
+                {
+                    exceptionReceived.countDown();
+                }
+            });
+
             Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
             conn.start();
@@ -531,6 +663,11 @@
             assertNotNull("There was no liked exception", cause);
             assertEquals("Wrong linked exception type", AMQAuthenticationException.class, cause.getClass());
             assertEquals("Incorrect error code received", 403, ((AMQAuthenticationException) cause).getErrorCode().getCode());
+
+            //use the latch to ensure the control thread waits long enough for the exception thread 
+            //to have done enough to mark the connection closed before teardown commences
+            assertTrue("Timed out waiting for conneciton to report close",
+            		exceptionReceived.await(2, TimeUnit.SECONDS));
         }
     }
 
@@ -538,11 +675,23 @@
     {
     	setUpACLTest();
     	
+        //QPID-2081: use a latch to sync on exception causing connection close, to work 
+        //around the connection close race during tearDown() causing sporadic failures
+    	final CountDownLatch exceptionReceived = new CountDownLatch(1);
+    	
         Connection connection = null;
         try
         {
             connection = getConnection("server", "guest");
 
+            connection.setExceptionListener(new ExceptionListener()
+            {
+                public void onException(JMSException e)
+                {
+                    exceptionReceived.countDown();
+                }
+            });
+            
             Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
             connection.start();
@@ -556,6 +705,11 @@
         catch (AMQAuthenticationException amqe)
         {
             assertEquals("Incorrect error code thrown", 403, amqe.getErrorCode().getCode());
+        
+            //use the latch to ensure the control thread waits long enough for the exception thread 
+            //to have done enough to mark the connection closed before teardown commences
+            assertTrue("Timed out waiting for conneciton to report close",
+            		exceptionReceived.await(2, TimeUnit.SECONDS));
         }
     }
 
@@ -653,9 +807,21 @@
     {
     	setUpACLTest();
     	
+        //QPID-2081: use a latch to sync on exception causing connection close, to work 
+        //around the connection close race during tearDown() causing sporadic failures
+    	final CountDownLatch exceptionReceived = new CountDownLatch(1);
+    	
         try
         {
             Connection conn = getConnection("server", "guest");
+            
+            conn.setExceptionListener(new ExceptionListener()
+            {
+                public void onException(JMSException e)
+                {
+                    exceptionReceived.countDown();
+                }
+            });
 
             ((AMQConnection) conn).setConnectionListener(this);
 
@@ -699,6 +865,11 @@
                 assertEquals("Incorrect exception", AMQAuthenticationException.class, cause.getClass());
                 assertEquals("Incorrect error code thrown", 403, ((AMQAuthenticationException) cause).getErrorCode().getCode());
             }
+            
+            //use the latch to ensure the control thread waits long enough for the exception thread 
+            //to have done enough to mark the connection closed before teardown commences
+            assertTrue("Timed out waiting for conneciton to report close",
+            		exceptionReceived.await(2, TimeUnit.SECONDS));
         }
     }
 
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java
index f22a405..7c5db29 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java
@@ -155,7 +155,7 @@
         public void onMessage(Message message)
         {
             // Stop processing if we have an error and had to stop running.
-            if (_receviedAll.getCount() == 0)
+            if (_receivedAll.getCount() == 0)
             {
                 _logger.debug("Dumping msgs due to error(" + _causeOfFailure.get().getMessage() + "):" + message);
                 return;
@@ -191,7 +191,7 @@
                     // Acknowledge the first message if we are now on the cleaned pass
                     if (cleaned)
                     {
-                        _receviedAll.countDown();
+                        _receivedAll.countDown();
                     }
 
                     return;
@@ -234,7 +234,7 @@
                 // this will then trigger test teardown.
                 if (cleaned)
                 {
-                    _receviedAll.countDown();
+                    _receivedAll.countDown();
                 }
 
                 //Reset message count so we can try again.
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java
index eb36522..ae7e30c 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java
@@ -299,7 +299,7 @@
         }
         catch (InterruptedException e)
         {
-            fail("Failover was interuppted");
+            fail("Failover was interrupted");
         }
     }
 
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java
index 4254727..a2703be 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java
@@ -34,7 +34,7 @@
 
 public class AcknowledgeOnMessageTest extends AcknowledgeTest implements MessageListener
 {
-    protected CountDownLatch _receviedAll;
+    protected CountDownLatch _receivedAll;
     protected AtomicReference<Exception> _causeOfFailure = new AtomicReference<Exception>(null);
 
     @Override
@@ -46,7 +46,7 @@
     @Override
     public void init(boolean transacted, int mode) throws Exception
     {
-        _receviedAll = new CountDownLatch(NUM_MESSAGES);
+        _receivedAll = new CountDownLatch(NUM_MESSAGES);
 
         super.init(transacted, mode);
         _consumer.setMessageListener(this);
@@ -64,26 +64,36 @@
 
         _connection.start();
 
-        int lastCount = (int) _receviedAll.getCount();
+        // Set the lastCount to NUM_MESSAGES, this ensures that the compare
+        // against the receviedAll count is accurate.
+        int lastCount = NUM_MESSAGES;
 
-        boolean complete = _receviedAll.await(5000L, TimeUnit.MILLISECONDS);
+        // Wait for messages to arrive
+        boolean complete = _receivedAll.await(5000L, TimeUnit.MILLISECONDS);
 
+        // If the messasges haven't arrived
         while (!complete)
         {
-            int currentCount = (int) _receviedAll.getCount();
+            // Check how many we have received
+            int currentCount = (int) _receivedAll.getCount();
 
             // make sure we have received a message in the last cycle.
             if (lastCount == currentCount)
             {
+                // If we didn't receive any messages then stop.
+                // Something must have gone wrong.
+                System.err.println("Giving up waiting as we didn't receive anything.");
                 break;
             }
             // Remember the currentCount as the lastCount for the next cycle.
             // so we can exit if things get locked up.
             lastCount = currentCount;
 
-            complete = _receviedAll.await(5000L, TimeUnit.MILLISECONDS);
+            // Wait again for messages to arrive.
+            complete = _receivedAll.await(5000L, TimeUnit.MILLISECONDS);
         }
 
+        // If we failed to receive all the messages then fail the test.
         if (!complete)
         {
             // Check to see if we ended due to an exception in the onMessage handler
@@ -95,10 +105,11 @@
             }
             else
             {
-                fail("All messages not received missing:" + _receviedAll.getCount() + "/" + NUM_MESSAGES);
+                fail("All messages not received missing:" + _receivedAll.getCount() + "/" + NUM_MESSAGES);
             }
         }
 
+        // Even if we received all the messages.
         // Check to see if we ended due to an exception in the onMessage handler
         Exception cause = _causeOfFailure.get();
         if (cause != null)
@@ -131,7 +142,7 @@
     {
         try
         {
-            int count = NUM_MESSAGES - (int) _receviedAll.getCount();
+            int count = NUM_MESSAGES - (int) _receivedAll.getCount();
 
             assertEquals("Incorrect message received", count, message.getIntProperty(INDEX));
 
@@ -144,7 +155,7 @@
 
             doAcknowlegement(message);
 
-            _receviedAll.countDown();
+            _receivedAll.countDown();
         }
         catch (Exception e)
         {
@@ -162,9 +173,9 @@
     {
         _causeOfFailure.set(e);
         // End the test.
-        while (_receviedAll.getCount() != 0)
+        while (_receivedAll.getCount() != 0)
         {
-            _receviedAll.countDown();
+            _receivedAll.countDown();
         }
     }
 }
diff --git a/qpid/java/test-profiles/010Excludes b/qpid/java/test-profiles/010Excludes
index 757a1e4..454aede 100755
--- a/qpid/java/test-profiles/010Excludes
+++ b/qpid/java/test-profiles/010Excludes
@@ -75,9 +75,6 @@
 // The C++ server has a totally different persistence mechanism
 org.apache.qpid.server.store.PersistentStoreTest#*
 
-// QPID-1225 : Temporary remove this test until the problem has been addressed
-org.apache.qpid.server.security.acl.SimpleACLTest#testClientPublishInvalidQueueSuccess
-
 // CPP Broker does not follow the same Logging convention as the Java broker
 org.apache.qpid.server.logging.*
 
diff --git a/qpid/java/test-profiles/08StandaloneExcludes b/qpid/java/test-profiles/08StandaloneExcludes
index ee781fb..ed12973 100644
--- a/qpid/java/test-profiles/08StandaloneExcludes
+++ b/qpid/java/test-profiles/08StandaloneExcludes
@@ -23,7 +23,6 @@
 // InVM Broker tests awaiting resolution of QPID-1103
 org.apache.qpid.test.client.timeouts.SyncWaitDelayTest#*
 org.apache.qpid.test.client.timeouts.SyncWaitTimeoutDelayTest#*
-org.apache.qpid.server.security.acl.SimpleACLTest#*
 
 // Those tests are written against the 0.10 path
 org.apache.qpid.test.unit.message.UTF8Test#*
diff --git a/qpid/java/test-profiles/Excludes b/qpid/java/test-profiles/Excludes
index aa60554..c9c9e91 100644
--- a/qpid/java/test-profiles/Excludes
+++ b/qpid/java/test-profiles/Excludes
@@ -17,9 +17,6 @@
 // QPID-XXX : Test fails to start external broker due to Derby Exception.
 org.apache.qpid.server.logging.DerbyMessageStoreLoggingTest#*
 
-// QPID-2081 :The configuration changes are now highlighting the close race condition
-org.apache.qpid.server.security.acl.SimpleACLTest#*
-
 // QPID-1816 : Client Ack has not been addressed
 org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverOnMessageTest#testDirtyClientAck
 org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverOnMessageTest#testClientAck
diff --git a/qpid/python/examples/api/server b/qpid/python/examples/api/server
new file mode 100755
index 0000000..adb2dcf
--- /dev/null
+++ b/qpid/python/examples/api/server
@@ -0,0 +1,87 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import optparse, sys, traceback
+from qpid.messaging import *
+from qpid.util import URL
+from subprocess import Popen, STDOUT, PIPE
+from qpid.log import enable, DEBUG, WARN
+
+parser = optparse.OptionParser(usage="usage: %prog [options] ADDRESS ...",
+                               description="handle requests from the supplied address.")
+parser.add_option("-b", "--broker", default="localhost",
+                  help="connect to specified BROKER (default %default)")
+parser.add_option("-v", dest="verbose", action="store_true", help="enable logging")
+
+opts, args = parser.parse_args()
+
+if opts.verbose:
+  enable("qpid", DEBUG)
+else:
+  enable("qpid", WARN)
+
+url = URL(opts.broker)
+if args:
+  addr = args.pop(0)
+else:
+  parser.error("address is required")
+
+# XXX: should make URL default the port for us
+conn = Connection.open(url.host, url.port or AMQP_PORT,
+                       username=url.user, password=url.password)
+conn.reconnect = True
+ssn = conn.session()
+rcv = ssn.receiver(addr)
+
+def dispatch(msg):
+  msg_type = msg.properties.get("type")
+  if msg_type == "shell":
+    proc = Popen(msg.content, shell=True, stderr=STDOUT, stdin=PIPE, stdout=PIPE)
+    output, _ = proc.communicate()
+    result = Message(output)
+    result.properties["exit"] = proc.returncode
+  elif msg_type == "eval":
+    try:
+      content = eval(msg.content)
+    except:
+      content = traceback.format_exc()
+    result = Message(content)
+  else:
+    result = Message("unrecognized message type: %s" % msg_type)
+  return result
+
+while True:
+  try:
+    msg = rcv.fetch()
+    response = dispatch(msg)
+    snd = ssn.sender(msg.reply_to)
+    try:
+      snd.send(response)
+    except SendError, e:
+      print e
+    snd.close()
+    ssn.acknowledge()
+  except Empty:
+    break
+  except ReceiveError, e:
+    print e
+    break
+
+conn.close()
diff --git a/qpid/python/examples/api/ping b/qpid/python/examples/api/spout
similarity index 66%
rename from qpid/python/examples/api/ping
rename to qpid/python/examples/api/spout
index 59b367c..6a9b2b6 100755
--- a/qpid/python/examples/api/ping
+++ b/qpid/python/examples/api/spout
@@ -22,35 +22,59 @@
 from qpid.messaging import *
 from qpid.util import URL
 
+def nameval(st):
+  idx = st.find("=")
+  if idx >= 0:
+    name = st[0:idx]
+    value = st[idx+1:]
+  else:
+    name = st
+    value = None
+  return name, value
+
 parser = optparse.OptionParser(usage="usage: %prog [options] ADDRESS [ CONTENT ... ]",
-                               description="Drain messages from the supplied address.")
+                               description="Send messages to the supplied address.")
 parser.add_option("-b", "--broker", default="localhost",
                   help="connect to specified BROKER (default %default)")
 parser.add_option("-c", "--count", type=int, default=1,
                   help="stop after count messages have been sent, zero disables (default %default)")
 parser.add_option("-t", "--timeout", type=float, default=None,
                   help="exit after the specified time")
-parser.add_option("-m", "--map", action="store_true",
-                  help="interpret content as map")
 parser.add_option("-i", "--id", help="use the supplied id instead of generating one")
+parser.add_option("-r", "--reply-to", help="specify reply-to address")
+parser.add_option("-P", "--property", dest="properties", action="append", default=[],
+                  help="specify message property")
+parser.add_option("-M", "--map", dest="entries", action="append", default=[],
+                  help="specify map entry for message body")
 
 opts, args = parser.parse_args()
 
 url = URL(opts.broker)
 if opts.id is None:
-  ping_id = str(uuid4())
+  spout_id = str(uuid4())
 else:
-  ping_id = opts.id
+  spout_id = opts.id
 if args:
   addr = args.pop(0)
 else:
   parser.error("address is required")
+
+content = None
+
 if args:
-  content = " ".join(args)
-  if opts.map:
-    content = eval(content)
+  text = " ".join(args)
 else:
-  content = None
+  text = None
+
+if opts.entries:
+  content = {}
+  if text:
+    content["text"] = text
+  for e in opts.entries:
+    name, val = nameval(e)
+    content[name] = val
+else:
+  content = text
 
 # XXX: should make URL default the port for us
 conn = Connection.open(url.host, url.port or AMQP_PORT,
@@ -62,8 +86,11 @@
 start = time.time()
 while (opts.count == 0 or count < opts.count) and \
       (opts.timeout is None or time.time() - start < opts.timeout):
-  msg = Message(content)
-  msg.properties["ping-id"] = "%s:%s" % (ping_id, count)
+  msg = Message(content, reply_to=opts.reply_to)
+  msg.properties["spout-id"] = "%s:%s" % (spout_id, count)
+  for p in opts.properties:
+    name, val = nameval(p)
+    msg.properties[name] = val
 
   try:
     snd.send(msg)
diff --git a/qpid/python/qpid/driver.py b/qpid/python/qpid/driver.py
index 7c293fe..588b460 100644
--- a/qpid/python/qpid/driver.py
+++ b/qpid/python/qpid/driver.py
@@ -439,14 +439,19 @@
     if _snd is None and not snd.closing and not snd.closed:
       _snd = Attachment(snd)
 
+      if snd.target is None:
+        snd.error = ("target is None",)
+        snd.closed = True
+        return
+
       try:
         _snd.name, _snd.subject, _snd.options = address.parse(snd.target)
       except address.LexError, e:
-        snd.error = e
+        snd.error = (e,)
         snd.closed = True
         return
       except address.ParseError, e:
-        snd.error = e
+        snd.error = (e,)
         snd.closed = True
         return
 
@@ -502,14 +507,19 @@
       _rcv.canceled = False
       _rcv.draining = False
 
+      if rcv.source is None:
+        rcv.error = ("source is None",)
+        rcv.closed = True
+        return
+
       try:
         _rcv.name, _rcv.subject, _rcv.options = address.parse(rcv.source)
       except address.LexError, e:
-        rcv.error = e
+        rcv.error = (e,)
         rcv.closed = True
         return
       except address.ParseError, e:
-        rcv.error = e
+        rcv.error = (e,)
         rcv.closed = True
         return
 
diff --git a/qpid/python/qpid/tests/messaging.py b/qpid/python/qpid/tests/messaging.py
index 2e4c0ca..860c366 100644
--- a/qpid/python/qpid/tests/messaging.py
+++ b/qpid/python/qpid/tests/messaging.py
@@ -597,6 +597,14 @@
       assert check(e), "unexpected error: %s" % e
       rcv.close()
 
+  def testNoneTarget(self):
+    # XXX: should have specific exception for this
+    self.sendErrorTest(None, SendError)
+
+  def testNoneSource(self):
+    # XXX: should have specific exception for this
+    self.fetchErrorTest(None, ReceiveError)
+
   def testNoTarget(self):
     # XXX: should have specific exception for this
     self.sendErrorTest(NOSUCH_Q, SendError, lambda e: NOSUCH_Q in str(e))
diff --git a/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpTypes.csproj b/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpTypes.csproj
index 9c13d47..cca131b 100644
--- a/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpTypes.csproj
+++ b/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpTypes.csproj
@@ -31,6 +31,8 @@
     <TargetFrameworkVersion>v3.5</TargetFrameworkVersion>

     <FileAlignment>512</FileAlignment>

     <RunPostBuildEvent>OnBuildSuccess</RunPostBuildEvent>

+    <SignAssembly>true</SignAssembly>

+    <AssemblyOriginatorKeyFile>..\..\..\wcfnet.snk</AssemblyOriginatorKeyFile>

   </PropertyGroup>

   <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">

     <DebugSymbols>true</DebugSymbols>

@@ -73,6 +75,9 @@
     <Compile Include="Properties\AssemblyInfo.cs" />

     <Compile Include="PropertyName.cs" />

   </ItemGroup>

+  <ItemGroup>

+    <None Include="..\..\..\wcfnet.snk" />

+  </ItemGroup>

   <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />

   <!-- To modify your build process, add your task inside one of the targets below and uncomment it. 

        Other similar extension points exist, see Microsoft.Common.targets.

diff --git a/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj b/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj
index 0b04eba..7484bc3 100644
--- a/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj
+++ b/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj
@@ -32,9 +32,8 @@
     <FileAlignment>512</FileAlignment>

     <StartupObject>

     </StartupObject>

-    <SignAssembly>false</SignAssembly>

-    <AssemblyOriginatorKeyFile>

-    </AssemblyOriginatorKeyFile>

+    <SignAssembly>true</SignAssembly>

+    <AssemblyOriginatorKeyFile>..\..\..\wcfnet.snk</AssemblyOriginatorKeyFile>

   </PropertyGroup>

   <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">

     <DebugSymbols>true</DebugSymbols>

@@ -99,4 +98,7 @@
       <Name>Interop</Name>

     </ProjectReference>

   </ItemGroup>

+  <ItemGroup>

+    <None Include="..\..\..\wcfnet.snk" />

+  </ItemGroup>

 </Project>
\ No newline at end of file
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj b/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj
index 484f689..b662be9 100644
--- a/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj
+++ b/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj
@@ -91,6 +91,7 @@
 				GenerateDebugInformation="true"

 				AssemblyDebug="1"

 				TargetMachine="1"

+				KeyFile="$(SolutionDir)\src\wcfnet.snk"

 			/>

 			<Tool

 				Name="VCALinkTool"

diff --git a/qpid/wcf/src/wcfnet.snk b/qpid/wcf/src/wcfnet.snk
new file mode 100644
index 0000000..d6456c8
--- /dev/null
+++ b/qpid/wcf/src/wcfnet.snk
Binary files differ