KNOX-2004 - Adding changes for handling Ping/Pong message from backend server on websocket connection (#200)
* KNOX-2004: Adding changes for handling Ping/Pong message from backend server on websocket connection
* KNOX-2004: Adding delay before sending ping to fix test case failure on travis
diff --git a/gateway-server/src/main/java/org/apache/knox/gateway/websockets/MessageEventCallback.java b/gateway-server/src/main/java/org/apache/knox/gateway/websockets/MessageEventCallback.java
index 4e3e1ea..2f7fcbe 100644
--- a/gateway-server/src/main/java/org/apache/knox/gateway/websockets/MessageEventCallback.java
+++ b/gateway-server/src/main/java/org/apache/knox/gateway/websockets/MessageEventCallback.java
@@ -18,6 +18,7 @@
package org.apache.knox.gateway.websockets;
import javax.websocket.CloseReason;
+import javax.websocket.PongMessage;
/**
* A simple callback interface used when evens happen on the Websocket client socket.
@@ -63,4 +64,11 @@
* @param session session
*/
void onMessageBinary(byte[] message, boolean last, Object session);
+
+ /**
+ * Callback when a pong control message is received.
+ * @param pongMessage pong message
+ * @param session session
+ */
+ void onMessagePong(PongMessage pongMessage, Object session);
}
diff --git a/gateway-server/src/main/java/org/apache/knox/gateway/websockets/ProxyInboundClient.java b/gateway-server/src/main/java/org/apache/knox/gateway/websockets/ProxyInboundClient.java
index 7cb1c00..a1797ed 100644
--- a/gateway-server/src/main/java/org/apache/knox/gateway/websockets/ProxyInboundClient.java
+++ b/gateway-server/src/main/java/org/apache/knox/gateway/websockets/ProxyInboundClient.java
@@ -21,6 +21,7 @@
import javax.websocket.EndpointConfig;
import javax.websocket.MessageHandler;
import javax.websocket.Session;
+import javax.websocket.PongMessage;
/**
* A Websocket client with callback which is not annotation based.
@@ -85,6 +86,21 @@
});
+ /* Add message handler for Pong Control Message */
+ session.addMessageHandler(new MessageHandler.Whole<PongMessage>() {
+
+ /**
+ * Called when a ping message has been received.
+ *
+ * @param message the message data.
+ */
+ @Override
+ public void onMessage(final PongMessage pongMessage) {
+ callback.onMessagePong(pongMessage, session);
+ }
+
+ });
+
callback.onConnectionOpen(backendSession);
}
diff --git a/gateway-server/src/main/java/org/apache/knox/gateway/websockets/ProxyWebSocketAdapter.java b/gateway-server/src/main/java/org/apache/knox/gateway/websockets/ProxyWebSocketAdapter.java
index 4274e6e..9d5015d 100644
--- a/gateway-server/src/main/java/org/apache/knox/gateway/websockets/ProxyWebSocketAdapter.java
+++ b/gateway-server/src/main/java/org/apache/knox/gateway/websockets/ProxyWebSocketAdapter.java
@@ -125,12 +125,8 @@
final RemoteEndpoint remote = frontEndSession.getRemote();
try {
if (!messageBuffer.isEmpty()) {
- LOG.debugLog("Found old buffered messages");
- for (String obj:messageBuffer) {
- LOG.debugLog("Sending old buffered message [From Backend <---]: " + obj);
- remote.sendString(obj);
- }
- messageBuffer.clear();
+ flushBufferedMessages(remote);
+
if (remote.getBatchMode() == BatchMode.ON) {
remote.flush();
}
@@ -251,12 +247,7 @@
}
/* Proxy message to frontend */
- LOG.debugLog("Found old buffered messages");
- for (String obj:messageBuffer) {
- LOG.debugLog("Sending old buffered message [From Backend <---]: " + obj);
- remote.sendString(obj);
- }
- messageBuffer.clear();
+ flushBufferedMessages(remote);
LOG.debugLog("Sending current message [From Backend <---]: " + message);
remote.sendString(message);
@@ -281,6 +272,35 @@
}
+ @Override
+ public void onMessagePong(javax.websocket.PongMessage message, Object session) {
+ LOG.logMessage("[From Backend <---]: PING");
+ remoteLock.lock();
+ final RemoteEndpoint remote = getRemote();
+ try {
+ if (remote == null) {
+ LOG.debugLog("Remote endpoint is null");
+ return;
+ }
+
+ /* Proxy Ping message to frontend */
+ flushBufferedMessages(remote);
+
+ LOG.logMessage("Sending current PING [From Backend <---]: ");
+ remote.sendPing(message.getApplicationData());
+ if (remote.getBatchMode() == BatchMode.ON) {
+ remote.flush();
+ }
+ } catch (IOException e) {
+ LOG.connectionFailed(e);
+ throw new RuntimeIOException(e);
+ }
+ finally
+ {
+ remoteLock.unlock();
+ }
+ }
+
};
}
@@ -317,4 +337,16 @@
frontendSession.close();
}
}
+
+ /*
+ * Function to flush buffered messages. Should be called with remoteLock held
+ */
+ private void flushBufferedMessages(final RemoteEndpoint remote) throws IOException {
+ LOG.debugLog("Flushing old buffered messages");
+ for(String obj:messageBuffer) {
+ LOG.debugLog("Sending old buffered message [From Backend <---]: " + obj);
+ remote.sendString(obj);
+ }
+ messageBuffer.clear();
+ }
}
diff --git a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/ProxyInboundClientTest.java b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/ProxyInboundClientTest.java
index dd5001f..08c4111 100644
--- a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/ProxyInboundClientTest.java
+++ b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/ProxyInboundClientTest.java
@@ -28,6 +28,7 @@
import javax.websocket.CloseReason;
import javax.websocket.ContainerProvider;
import javax.websocket.DeploymentException;
+import javax.websocket.PongMessage;
import javax.websocket.Session;
import javax.websocket.WebSocketContainer;
import java.io.IOException;
@@ -123,6 +124,10 @@
@Override
public void onMessageBinary(byte[] message, boolean last, Object session) {
}
+
+ @Override
+ public void onMessagePong(PongMessage message, Object session) {
+ }
});
Assert.assertThat(client, instanceOf(javax.websocket.Endpoint.class));
@@ -176,6 +181,10 @@
receivedBinaryMessage = message;
isTestComplete.set(true);
}
+
+ @Override
+ public void onMessagePong(PongMessage message, Object session) {
+ }
});
Assert.assertThat(client, instanceOf(javax.websocket.Endpoint.class));
diff --git a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketClient.java b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketClient.java
index 772be94..364f2c4 100644
--- a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketClient.java
+++ b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketClient.java
@@ -18,6 +18,8 @@
package org.apache.knox.gateway.websockets;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
@@ -30,6 +32,7 @@
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
+import javax.websocket.PongMessage;
import javax.websocket.Session;
import org.eclipse.jetty.util.BlockingArrayQueue;
@@ -61,6 +64,13 @@
this.messageQueue.offer(message);
}
+ @OnMessage
+ public void onMessage(PongMessage message) {
+ ByteBuffer byteMessage = message.getApplicationData();
+ String s = StandardCharsets.UTF_8.decode(byteMessage).toString();
+ this.messageQueue.offer(s);
+ }
+
@OnOpen
public void onOpen(Session session) {
this.session = session;
diff --git a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketServerInitiatedPingTest.java b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketServerInitiatedPingTest.java
new file mode 100644
index 0000000..7c6d736
--- /dev/null
+++ b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketServerInitiatedPingTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.websockets;
+
+import org.eclipse.jetty.io.RuntimeIOException;
+import org.eclipse.jetty.websocket.api.BatchMode;
+import org.eclipse.jetty.websocket.api.RemoteEndpoint;
+import org.eclipse.jetty.websocket.api.WebSocketAdapter;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.server.WebSocketHandler;
+import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
+import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
+import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
+import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.websocket.ContainerProvider;
+import javax.websocket.WebSocketContainer;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * A basic test that attempts to proxy websocket connections through Knox
+ * gateway.
+ * <p>
+ * The way the test is set up is as follows: <br>
+ * <ul>
+ * <li>A Mock Websocket server is setup which simply echos the responses sent by
+ * client.
+ * <li>Knox Gateway is set up with websocket handler
+ * {@link GatewayWebsocketHandler} that can proxy the requests.
+ * <li>Appropriate Topology and service definition files are set up with the
+ * address of the Websocket server.
+ * <li>A mock client is setup to connect to gateway.
+ * </ul>
+ *
+ * The test is to confirm whether the message is sent all the way to the backend
+ * Websocket server through Knox and back.
+ *
+ * @since 0.10
+ */
+public class WebsocketServerInitiatedPingTest extends WebsocketEchoTestBase {
+
+ public WebsocketServerInitiatedPingTest() {
+ super();
+ }
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ handler = new WebsocketServerInitiatedPingHandler();
+ WebsocketEchoTestBase.setUpBeforeClass();
+ WebsocketEchoTestBase.startServers("ws");
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() {
+ WebsocketEchoTestBase.tearDownAfterClass();
+ }
+
+ /*
+ * Test websocket server initiated ping
+ */
+ @Test
+ public void testGatewayServerInitiatedPing() throws Exception {
+ WebSocketContainer container = ContainerProvider.getWebSocketContainer();
+
+ WebsocketClient client = new WebsocketClient();
+ container.connectToServer(client,
+ new URI(serverUri.toString() + "gateway/websocket/123foo456bar/channels"));
+
+ //session.getBasicRemote().sendText("Echo");
+ client.messageQueue.awaitMessages(1, 10000, TimeUnit.MILLISECONDS);
+
+ assertThat(client.messageQueue.get(0), is("PingPong"));
+ }
+
+ /**
+ * A Mock websocket handler
+ *
+ */
+ private static class WebsocketServerInitiatedPingHandler extends WebSocketHandler implements WebSocketCreator {
+ private final ServerInitiatingPingSocket socket = new ServerInitiatingPingSocket();
+
+ @Override
+ public void configure(WebSocketServletFactory factory) {
+ factory.getPolicy().setMaxTextMessageSize(2 * 1024 * 1024);
+ factory.setCreator(this);
+ }
+
+ @Override
+ public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse resp) {
+ return socket;
+ }
+ }
+
+ /**
+ * A simple socket initiating message on connect
+ */
+ private static class ServerInitiatingPingSocket extends WebSocketAdapter {
+
+ @Override
+ public void onWebSocketError(Throwable cause) {
+ throw new RuntimeException(cause);
+ }
+
+ @Override
+ public void onWebSocketConnect(Session sess) {
+ super.onWebSocketConnect(sess);
+ try {
+ Thread.sleep(1000);
+ } catch (Exception e) {
+ }
+ final String textMessage = "PingPong";
+ final ByteBuffer binaryMessage = ByteBuffer.wrap(
+ textMessage.getBytes(StandardCharsets.UTF_8));
+
+ try {
+ RemoteEndpoint remote = getRemote();
+ remote.sendPing(binaryMessage);
+ if (remote.getBatchMode() == BatchMode.ON) {
+ remote.flush();
+ }
+ } catch (IOException x) {
+ throw new RuntimeIOException(x);
+ }
+ }
+ }
+}