ETCH-287 Fixing deadlock while calling methods on client side

On calls getResult(), hasResult(), getException() or hasException() on a
AsyncResult object the client waits until it gets notified from the
mailbox that a respective message has arrived.
In case the connection on socket level was broken before or during a
method call the AsyncResult object was not notified about this error
state and blocked forever.

Change-Id: I511e2fc60eb8b347a0d184cc02a7fd3ec937f2b6

git-svn-id: https://svn.apache.org/repos/asf/etch/trunk@1578869 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/binding-cpp/compiler/src/main/resources/org/apache/etch/bindings/cpp/compiler/remote_cpp.vm b/binding-cpp/compiler/src/main/resources/org/apache/etch/bindings/cpp/compiler/remote_cpp.vm
index 4150d64..f611163 100644
--- a/binding-cpp/compiler/src/main/resources/org/apache/etch/bindings/cpp/compiler/remote_cpp.vm
+++ b/binding-cpp/compiler/src/main/resources/org/apache/etch/bindings/cpp/compiler/remote_cpp.vm
@@ -52,8 +52,10 @@
     capu::SmartPointer<EtchObject> result;
     status = mBase->endcall(mb, $vfname::$n.getResultMessage().vname($helper), result);
     if(status != ETCH_OK) {
-      // TODO set result to a runtime exception 
-      return ETCH_OK;
+      capu::SmartPointer<EtchRuntimeException> exception = new EtchRuntimeException("Error occured while reading peers answer from mailbox for call $n.name() ", ETCH_ERROR);
+      setException(capu::smartpointer_cast<EtchException>(exception));
+      mMailbox = NULL;
+      return ETCH_ERROR;
     }
     
     //result is NULL if it was a void call and no exception has been set
@@ -68,10 +70,13 @@
 #end
     } else {
       setMailboxStatus();
-    }
-
-
-  }
+    } 
+  } else {
+    //mailbox has been closed without answer from peer
+    capu::SmartPointer<EtchRuntimeException> exception = new EtchRuntimeException("Peer did not answer in time to call $n.name()", ETCH_TIMEOUT);
+    setException(capu::smartpointer_cast<EtchException>(exception));
+    mMailbox = NULL;
+}
 
 #end
   return ETCH_OK;
@@ -83,7 +88,10 @@
   capu::SmartPointer<EtchMessage> msg;
   status = base->newMessage($vfname::$n.vname( $helper ), &msg); 
   if(status != ETCH_OK) {
-    // TODO log error
+    $n.name()AsyncResultRemote* result = new $n.name()AsyncResultRemote(base, NULL);
+    capu::SmartPointer<EtchRuntimeException> exception = new EtchRuntimeException("Unable to create message for call $n.name().", status);
+    result->setException(capu::smartpointer_cast<EtchException>(exception));
+    return result;
   }
 
 #set($ObjCount = 0)
@@ -95,11 +103,14 @@
 
   EtchMailbox* mb = NULL; 
   status = base->begincall(msg, mb);
+  $n.name()AsyncResultRemote* result = new $n.name()AsyncResultRemote(base, mb);
+
   if(status != ETCH_OK) {
-    // TODO log error
+    //mailbox has been closed during send
+    capu::SmartPointer<EtchRuntimeException> exception = new EtchRuntimeException("Unable to call $n.name() as connection has been closed by peer.", status);
+    result->setException(capu::smartpointer_cast<EtchException>(exception));
   }
 
-  $n.name()AsyncResultRemote* result = new $n.name()AsyncResultRemote(base, mb); 
   return result;
 }
 
diff --git a/binding-cpp/runtime/include/support/EtchMailbox.h b/binding-cpp/runtime/include/support/EtchMailbox.h
index 53a1c7f..848bb94 100644
--- a/binding-cpp/runtime/include/support/EtchMailbox.h
+++ b/binding-cpp/runtime/include/support/EtchMailbox.h
@@ -70,7 +70,7 @@
    * Closes the mailbox so that no more messages can be delivered.
    * Queued messages remain to be read.
    */
-  virtual status_t closeDelivery() = 0;
+  virtual status_t closeDelivery(capu::bool_t withNotification = true) = 0;
 
   /**
    * Closes the mailbox so that no more messages will be delivered or
diff --git a/binding-cpp/runtime/include/support/EtchPlainMailbox.h b/binding-cpp/runtime/include/support/EtchPlainMailbox.h
index 076c8e5..adb2154 100644
--- a/binding-cpp/runtime/include/support/EtchPlainMailbox.h
+++ b/binding-cpp/runtime/include/support/EtchPlainMailbox.h
@@ -73,7 +73,7 @@
    * @param true if this call closed the mailbox (that is, if action was
    * taken), false if the mailbox was already closed.
    */
-  status_t closeDelivery();
+  status_t closeDelivery(capu::bool_t withNotification = true);
 
   /**
    * Closes the mailbox so that no more messages will be delivered or
diff --git a/binding-cpp/runtime/src/main/support/EtchAsyncResultNone.cpp b/binding-cpp/runtime/src/main/support/EtchAsyncResultNone.cpp
index 1d666fd..f8af524 100644
--- a/binding-cpp/runtime/src/main/support/EtchAsyncResultNone.cpp
+++ b/binding-cpp/runtime/src/main/support/EtchAsyncResultNone.cpp
@@ -31,7 +31,7 @@
 EtchAsyncResultNone::~EtchAsyncResultNone() {
   if(mMailbox != NULL) {
     mMailbox->unregisterNotify(this);
-    mMailbox->closeDelivery();
+    mMailbox->closeDelivery(false);
     delete mMailbox;
   }
 }
diff --git a/binding-cpp/runtime/src/main/support/EtchPlainMailbox.cpp b/binding-cpp/runtime/src/main/support/EtchPlainMailbox.cpp
index 9321cf2..d49957e 100644
--- a/binding-cpp/runtime/src/main/support/EtchPlainMailbox.cpp
+++ b/binding-cpp/runtime/src/main/support/EtchPlainMailbox.cpp
@@ -91,7 +91,7 @@
   return ETCH_ERROR;
 }
 
-status_t EtchPlainMailbox::closeDelivery() {
+status_t EtchPlainMailbox::closeDelivery(capu::bool_t withNotification) {
 
   mMutex.lock();
   if(mQueue.isClosed()) {
@@ -103,12 +103,15 @@
   mQueue.close();
   mMutex.unlock();
 
-  fireNotify();
+  if (withNotification) {
+    fireNotify();
+  }
+
   return ETCH_OK;
 }
 
 status_t EtchPlainMailbox::closeRead() {
-  if (closeDelivery() == ETCH_OK) {
+  if (closeDelivery(false) == ETCH_OK) {
     EtchMailbox::EtchElement* mbe = NULL;
     while ((read(mbe)) == ETCH_OK) {
       mMailboxManager->redeliver(mbe->mSender, mbe->mMsg);
diff --git a/binding-cpp/runtime/src/main/transport/EtchPlainMailboxManager.cpp b/binding-cpp/runtime/src/main/transport/EtchPlainMailboxManager.cpp
index c824f14..922751b 100644
--- a/binding-cpp/runtime/src/main/transport/EtchPlainMailboxManager.cpp
+++ b/binding-cpp/runtime/src/main/transport/EtchPlainMailboxManager.cpp
@@ -26,14 +26,18 @@
 }
 
 EtchPlainMailboxManager::~EtchPlainMailboxManager() {
+  mMutex.lock();
+
   EtchHashTable<EtchLong, EtchMailbox*>::Iterator it = mMailboxes.begin();
   EtchHashTable<EtchLong, EtchMailbox*>::HashTableEntry entry;
-  // TODO check thread safety
+
   while (it.hasNext()) {
     it.next(&entry);
     entry.value->closeDelivery();
     delete entry.value;
   }
+
+  mMutex.unlock();
 }
 
 EtchTransportMessage* EtchPlainMailboxManager::getTransport() {
@@ -51,20 +55,16 @@
 
   const EtchLong msgid = mb->getMessageId();
 
-  mMutex.lock();
   if (!mUp) {
-    mMutex.unlock();
     return ETCH_EINVAL;
   }
 
   EtchMailbox* tmp = NULL;
   if (mMailboxes.get(msgid, &tmp) != ETCH_ENOT_EXIST) {
-    mMutex.unlock();
     return ETCH_EINVAL;
   }
 
   mMailboxes.put(msgid, mb);
-  mMutex.unlock();
   ETCH_LOG_DEBUG(mRuntime->getLogger(), mRuntime->getLogger().getMailboxContext(), "A new mailbox is registered");
   return ETCH_OK;
 }
@@ -119,19 +119,23 @@
 
   ETCH_LOG_DEBUG(mRuntime->getLogger(), mRuntime->getLogger().getMailboxContext(), "A mailbox has been created for msgid " << msgid);
   EtchMailbox *mb = new EtchPlainMailbox(this, msgid);
+  mMutex.lock();
   if (registerMailbox(mb) != ETCH_OK) {
     delete mb;
     ETCH_LOG_ERROR(mRuntime->getLogger(), mRuntime->getLogger().getMailboxContext(), "Mailbox registration failed");
+    mMutex.unlock();
     return ETCH_ERROR;
   }
 
   ETCH_LOG_DEBUG(mRuntime->getLogger(), mRuntime->getLogger().getMailboxContext(), "Message sending to Messagizer and registering a respective mailbox");
   if (mTransport->transportMessage(recipient, msg) == ETCH_OK) {
     result = mb;
+    mMutex.unlock();
     return ETCH_OK;
   } else {
-    unregisterMailbox(mb);
+    mb->closeDelivery();
     delete mb;
+    mMutex.unlock();
     return ETCH_ERROR;
   }
 }
@@ -173,12 +177,12 @@
 }
 
 status_t EtchPlainMailboxManager::sessionNotify(capu::SmartPointer<EtchObject> event) {
+  mMutex.lock();
   if (event->equals(&EtchSession::UP())) {
     mUp = true;
     ETCH_LOG_TRACE(mRuntime->getLogger(), mRuntime->getLogger().getMailboxContext(), "Connection is up");
   } else if (event->equals(&EtchSession::DOWN())) {
     mUp = false;
-    // TODO check thread safety
     EtchHashTable<EtchLong, EtchMailbox*>::Iterator it = mMailboxes.begin();
     EtchHashTable<EtchLong, EtchMailbox*>::HashTableEntry entry;
     while (it.hasNext()) {
@@ -188,6 +192,8 @@
     }
     ETCH_LOG_TRACE(mRuntime->getLogger(), mRuntime->getLogger().getMailboxContext(), "Connection is down");
   }
+  mMutex.unlock();
+
   status_t status;
   if(mSession != NULL) {
     status = mSession->sessionNotify(event);