ReaderHandler starts receiving messages after WebSocket session established (#944)
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
index c87e62e..84e712b 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
@@ -43,6 +43,7 @@
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.websocket.data.ConsumerMessage;
+import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.slf4j.Logger;
@@ -87,7 +88,6 @@
log.warn("[{}:{}] Failed to add reader handler for topic {}", request.getRemoteAddr(),
request.getRemotePort(), topic);
}
- receiveMessage();
} catch (Exception e) {
log.warn("[{}:{}] Failed in creating reader {} on topic {}", request.getRemoteAddr(),
request.getRemotePort(), subscription, topic, e);
@@ -162,6 +162,12 @@
}
@Override
+ public void onWebSocketConnect(Session session) {
+ super.onWebSocketConnect(session);
+ receiveMessage();
+ }
+
+ @Override
public void onWebSocketText(String message) {
super.onWebSocketText(message);