Start receiving messages after WebSocket session established (#943)
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
index d6fd28b..ff9c641 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
@@ -40,6 +40,7 @@
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.websocket.data.ConsumerAck;
import org.apache.pulsar.websocket.data.ConsumerMessage;
+import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.slf4j.Logger;
@@ -91,7 +92,6 @@
log.warn("[{}:{}] Failed to add consumer handler for topic {}", request.getRemoteAddr(),
request.getRemotePort(), topic);
}
- receiveMessage();
} catch (Exception e) {
log.warn("[{}:{}] Failed in creating subscription {} on topic {}", request.getRemoteAddr(),
request.getRemotePort(), subscription, topic, e);
@@ -168,6 +168,12 @@
}
@Override
+ public void onWebSocketConnect(Session session) {
+ super.onWebSocketConnect(session);
+ receiveMessage();
+ }
+
+ @Override
public void onWebSocketText(String message) {
super.onWebSocketText(message);