swallow the parsing exceptions for now until they are fine tuned
diff --git a/library/src/main/java/com/datatorrent/lib/util/PubSubWebSocketClient.java b/library/src/main/java/com/datatorrent/lib/util/PubSubWebSocketClient.java
index 0a1ded2..8dd7eca 100644
--- a/library/src/main/java/com/datatorrent/lib/util/PubSubWebSocketClient.java
+++ b/library/src/main/java/com/datatorrent/lib/util/PubSubWebSocketClient.java
@@ -19,14 +19,19 @@
import java.net.URI;
import java.util.List;
import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicReference;
import com.ning.http.client.*;
import com.ning.http.client.AsyncHttpClient.BoundRequestBuilder;
import com.ning.http.client.websocket.*;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.datatorrent.lib.util.PubSubMessage.PubSubMessageType;
@@ -35,7 +40,6 @@
import com.datatorrent.common.util.DTThrowable;
import com.datatorrent.common.util.NameableThreadFactory;
-import java.util.concurrent.atomic.AtomicReference;
/**
* <p>Abstract PubSubWebSocketClient class.</p>
@@ -66,8 +70,14 @@
pubSubMessage = codec.parseMessage(message);
PubSubWebSocketClient.this.onMessage(pubSubMessage.getType().getIdentifier(), pubSubMessage.getTopic(), pubSubMessage.getData());
}
+ catch (JsonParseException jpe) {
+ logger.warn("Ignoring unparceable JSON message: {}", message, jpe);
+ }
+ catch (JsonMappingException jme) {
+ logger.warn("Ignoring JSON mapping in message: {}", message, jme);
+ }
catch (IOException ex) {
- DTThrowable.rethrow(ex);
+ onError(ex);
}
}
@@ -277,6 +287,7 @@
if (connection == null) {
throw new IOException("Connection is not open");
}
+ return;
}
@@ -466,4 +477,6 @@
{
throwable.set(t);
}
+
+ private static final Logger logger = LoggerFactory.getLogger(PubSubWebSocketClient.class);
}