ETCH-287 Fixing deadlock while calling methods on client side
On calls getResult(), hasResult(), getException() or hasException() on a
AsyncResult object the client waits until it gets notified from the
mailbox that a respective message has arrived.
In case the connection on socket level was broken before or during a
method call the AsyncResult object was not notified about this error
state and blocked forever.
Change-Id: I511e2fc60eb8b347a0d184cc02a7fd3ec937f2b6
git-svn-id: https://svn.apache.org/repos/asf/etch/trunk@1578869 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/binding-cpp/compiler/src/main/resources/org/apache/etch/bindings/cpp/compiler/remote_cpp.vm b/binding-cpp/compiler/src/main/resources/org/apache/etch/bindings/cpp/compiler/remote_cpp.vm
index 4150d64..f611163 100644
--- a/binding-cpp/compiler/src/main/resources/org/apache/etch/bindings/cpp/compiler/remote_cpp.vm
+++ b/binding-cpp/compiler/src/main/resources/org/apache/etch/bindings/cpp/compiler/remote_cpp.vm
@@ -52,8 +52,10 @@
capu::SmartPointer<EtchObject> result;
status = mBase->endcall(mb, $vfname::$n.getResultMessage().vname($helper), result);
if(status != ETCH_OK) {
- // TODO set result to a runtime exception
- return ETCH_OK;
+ capu::SmartPointer<EtchRuntimeException> exception = new EtchRuntimeException("Error occured while reading peers answer from mailbox for call $n.name() ", ETCH_ERROR);
+ setException(capu::smartpointer_cast<EtchException>(exception));
+ mMailbox = NULL;
+ return ETCH_ERROR;
}
//result is NULL if it was a void call and no exception has been set
@@ -68,10 +70,13 @@
#end
} else {
setMailboxStatus();
- }
-
-
- }
+ }
+ } else {
+ //mailbox has been closed without answer from peer
+ capu::SmartPointer<EtchRuntimeException> exception = new EtchRuntimeException("Peer did not answer in time to call $n.name()", ETCH_TIMEOUT);
+ setException(capu::smartpointer_cast<EtchException>(exception));
+ mMailbox = NULL;
+}
#end
return ETCH_OK;
@@ -83,7 +88,10 @@
capu::SmartPointer<EtchMessage> msg;
status = base->newMessage($vfname::$n.vname( $helper ), &msg);
if(status != ETCH_OK) {
- // TODO log error
+ $n.name()AsyncResultRemote* result = new $n.name()AsyncResultRemote(base, NULL);
+ capu::SmartPointer<EtchRuntimeException> exception = new EtchRuntimeException("Unable to create message for call $n.name().", status);
+ result->setException(capu::smartpointer_cast<EtchException>(exception));
+ return result;
}
#set($ObjCount = 0)
@@ -95,11 +103,14 @@
EtchMailbox* mb = NULL;
status = base->begincall(msg, mb);
+ $n.name()AsyncResultRemote* result = new $n.name()AsyncResultRemote(base, mb);
+
if(status != ETCH_OK) {
- // TODO log error
+ //mailbox has been closed during send
+ capu::SmartPointer<EtchRuntimeException> exception = new EtchRuntimeException("Unable to call $n.name() as connection has been closed by peer.", status);
+ result->setException(capu::smartpointer_cast<EtchException>(exception));
}
- $n.name()AsyncResultRemote* result = new $n.name()AsyncResultRemote(base, mb);
return result;
}
diff --git a/binding-cpp/runtime/include/support/EtchMailbox.h b/binding-cpp/runtime/include/support/EtchMailbox.h
index 53a1c7f..848bb94 100644
--- a/binding-cpp/runtime/include/support/EtchMailbox.h
+++ b/binding-cpp/runtime/include/support/EtchMailbox.h
@@ -70,7 +70,7 @@
* Closes the mailbox so that no more messages can be delivered.
* Queued messages remain to be read.
*/
- virtual status_t closeDelivery() = 0;
+ virtual status_t closeDelivery(capu::bool_t withNotification = true) = 0;
/**
* Closes the mailbox so that no more messages will be delivered or
diff --git a/binding-cpp/runtime/include/support/EtchPlainMailbox.h b/binding-cpp/runtime/include/support/EtchPlainMailbox.h
index 076c8e5..adb2154 100644
--- a/binding-cpp/runtime/include/support/EtchPlainMailbox.h
+++ b/binding-cpp/runtime/include/support/EtchPlainMailbox.h
@@ -73,7 +73,7 @@
* @param true if this call closed the mailbox (that is, if action was
* taken), false if the mailbox was already closed.
*/
- status_t closeDelivery();
+ status_t closeDelivery(capu::bool_t withNotification = true);
/**
* Closes the mailbox so that no more messages will be delivered or
diff --git a/binding-cpp/runtime/src/main/support/EtchAsyncResultNone.cpp b/binding-cpp/runtime/src/main/support/EtchAsyncResultNone.cpp
index 1d666fd..f8af524 100644
--- a/binding-cpp/runtime/src/main/support/EtchAsyncResultNone.cpp
+++ b/binding-cpp/runtime/src/main/support/EtchAsyncResultNone.cpp
@@ -31,7 +31,7 @@
EtchAsyncResultNone::~EtchAsyncResultNone() {
if(mMailbox != NULL) {
mMailbox->unregisterNotify(this);
- mMailbox->closeDelivery();
+ mMailbox->closeDelivery(false);
delete mMailbox;
}
}
diff --git a/binding-cpp/runtime/src/main/support/EtchPlainMailbox.cpp b/binding-cpp/runtime/src/main/support/EtchPlainMailbox.cpp
index 9321cf2..d49957e 100644
--- a/binding-cpp/runtime/src/main/support/EtchPlainMailbox.cpp
+++ b/binding-cpp/runtime/src/main/support/EtchPlainMailbox.cpp
@@ -91,7 +91,7 @@
return ETCH_ERROR;
}
-status_t EtchPlainMailbox::closeDelivery() {
+status_t EtchPlainMailbox::closeDelivery(capu::bool_t withNotification) {
mMutex.lock();
if(mQueue.isClosed()) {
@@ -103,12 +103,15 @@
mQueue.close();
mMutex.unlock();
- fireNotify();
+ if (withNotification) {
+ fireNotify();
+ }
+
return ETCH_OK;
}
status_t EtchPlainMailbox::closeRead() {
- if (closeDelivery() == ETCH_OK) {
+ if (closeDelivery(false) == ETCH_OK) {
EtchMailbox::EtchElement* mbe = NULL;
while ((read(mbe)) == ETCH_OK) {
mMailboxManager->redeliver(mbe->mSender, mbe->mMsg);
diff --git a/binding-cpp/runtime/src/main/transport/EtchPlainMailboxManager.cpp b/binding-cpp/runtime/src/main/transport/EtchPlainMailboxManager.cpp
index c824f14..922751b 100644
--- a/binding-cpp/runtime/src/main/transport/EtchPlainMailboxManager.cpp
+++ b/binding-cpp/runtime/src/main/transport/EtchPlainMailboxManager.cpp
@@ -26,14 +26,18 @@
}
EtchPlainMailboxManager::~EtchPlainMailboxManager() {
+ mMutex.lock();
+
EtchHashTable<EtchLong, EtchMailbox*>::Iterator it = mMailboxes.begin();
EtchHashTable<EtchLong, EtchMailbox*>::HashTableEntry entry;
- // TODO check thread safety
+
while (it.hasNext()) {
it.next(&entry);
entry.value->closeDelivery();
delete entry.value;
}
+
+ mMutex.unlock();
}
EtchTransportMessage* EtchPlainMailboxManager::getTransport() {
@@ -51,20 +55,16 @@
const EtchLong msgid = mb->getMessageId();
- mMutex.lock();
if (!mUp) {
- mMutex.unlock();
return ETCH_EINVAL;
}
EtchMailbox* tmp = NULL;
if (mMailboxes.get(msgid, &tmp) != ETCH_ENOT_EXIST) {
- mMutex.unlock();
return ETCH_EINVAL;
}
mMailboxes.put(msgid, mb);
- mMutex.unlock();
ETCH_LOG_DEBUG(mRuntime->getLogger(), mRuntime->getLogger().getMailboxContext(), "A new mailbox is registered");
return ETCH_OK;
}
@@ -119,19 +119,23 @@
ETCH_LOG_DEBUG(mRuntime->getLogger(), mRuntime->getLogger().getMailboxContext(), "A mailbox has been created for msgid " << msgid);
EtchMailbox *mb = new EtchPlainMailbox(this, msgid);
+ mMutex.lock();
if (registerMailbox(mb) != ETCH_OK) {
delete mb;
ETCH_LOG_ERROR(mRuntime->getLogger(), mRuntime->getLogger().getMailboxContext(), "Mailbox registration failed");
+ mMutex.unlock();
return ETCH_ERROR;
}
ETCH_LOG_DEBUG(mRuntime->getLogger(), mRuntime->getLogger().getMailboxContext(), "Message sending to Messagizer and registering a respective mailbox");
if (mTransport->transportMessage(recipient, msg) == ETCH_OK) {
result = mb;
+ mMutex.unlock();
return ETCH_OK;
} else {
- unregisterMailbox(mb);
+ mb->closeDelivery();
delete mb;
+ mMutex.unlock();
return ETCH_ERROR;
}
}
@@ -173,12 +177,12 @@
}
status_t EtchPlainMailboxManager::sessionNotify(capu::SmartPointer<EtchObject> event) {
+ mMutex.lock();
if (event->equals(&EtchSession::UP())) {
mUp = true;
ETCH_LOG_TRACE(mRuntime->getLogger(), mRuntime->getLogger().getMailboxContext(), "Connection is up");
} else if (event->equals(&EtchSession::DOWN())) {
mUp = false;
- // TODO check thread safety
EtchHashTable<EtchLong, EtchMailbox*>::Iterator it = mMailboxes.begin();
EtchHashTable<EtchLong, EtchMailbox*>::HashTableEntry entry;
while (it.hasNext()) {
@@ -188,6 +192,8 @@
}
ETCH_LOG_TRACE(mRuntime->getLogger(), mRuntime->getLogger().getMailboxContext(), "Connection is down");
}
+ mMutex.unlock();
+
status_t status;
if(mSession != NULL) {
status = mSession->sessionNotify(event);