swallow the parsing exceptions for now until they are fine tuned
diff --git a/library/src/main/java/com/datatorrent/lib/util/PubSubWebSocketClient.java b/library/src/main/java/com/datatorrent/lib/util/PubSubWebSocketClient.java
index 0a1ded2..8dd7eca 100644
--- a/library/src/main/java/com/datatorrent/lib/util/PubSubWebSocketClient.java
+++ b/library/src/main/java/com/datatorrent/lib/util/PubSubWebSocketClient.java
@@ -19,14 +19,19 @@
 import java.net.URI;
 import java.util.List;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicReference;
 
 import com.ning.http.client.*;
 import com.ning.http.client.AsyncHttpClient.BoundRequestBuilder;
 import com.ning.http.client.websocket.*;
 
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.map.JsonMappingException;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.datatorrent.lib.util.PubSubMessage.PubSubMessageType;
 
@@ -35,7 +40,6 @@
 
 import com.datatorrent.common.util.DTThrowable;
 import com.datatorrent.common.util.NameableThreadFactory;
-import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * <p>Abstract PubSubWebSocketClient class.</p>
@@ -66,8 +70,14 @@
         pubSubMessage = codec.parseMessage(message);
         PubSubWebSocketClient.this.onMessage(pubSubMessage.getType().getIdentifier(), pubSubMessage.getTopic(), pubSubMessage.getData());
       }
+      catch (JsonParseException jpe) {
+        logger.warn("Ignoring unparceable JSON message: {}", message, jpe);
+      }
+      catch (JsonMappingException jme) {
+        logger.warn("Ignoring JSON mapping in message: {}", message, jme);
+      }
       catch (IOException ex) {
-        DTThrowable.rethrow(ex);
+        onError(ex);
       }
     }
 
@@ -277,6 +287,7 @@
       if (connection == null) {
         throw new IOException("Connection is not open");
       }
+      return;
     }
 
 
@@ -466,4 +477,6 @@
   {
     throwable.set(t);
   }
+
+  private static final Logger logger = LoggerFactory.getLogger(PubSubWebSocketClient.class);
 }