Start receiving messages after WebSocket session established (#943)

diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
index d6fd28b..ff9c641 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
@@ -40,6 +40,7 @@
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.websocket.data.ConsumerAck;
 import org.apache.pulsar.websocket.data.ConsumerMessage;
+import org.eclipse.jetty.websocket.api.Session;
 import org.eclipse.jetty.websocket.api.WriteCallback;
 import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
 import org.slf4j.Logger;
@@ -91,7 +92,6 @@
                 log.warn("[{}:{}] Failed to add consumer handler for topic {}", request.getRemoteAddr(),
                         request.getRemotePort(), topic);
             }
-            receiveMessage();
         } catch (Exception e) {
             log.warn("[{}:{}] Failed in creating subscription {} on topic {}", request.getRemoteAddr(),
                     request.getRemotePort(), subscription, topic, e);
@@ -168,6 +168,12 @@
     }
 
     @Override
+    public void onWebSocketConnect(Session session) {
+        super.onWebSocketConnect(session);
+        receiveMessage();
+    }
+
+    @Override
     public void onWebSocketText(String message) {
         super.onWebSocketText(message);