KNOX-1997: Adding changes to buffer messages from backend in onMessag… (#143)
* KNOX-1997: Adding changes to buffer messages from backend in onMessageText() if frontend session related data structures have not been setup i.e. remote is null. Message buffer will be flushed when remote is set by other thread executing onWebSocketConnect() API. To synchronise reading/flushing buffer, added a lock
diff --git a/gateway-server/src/main/java/org/apache/knox/gateway/config/impl/GatewayConfigImpl.java b/gateway-server/src/main/java/org/apache/knox/gateway/config/impl/GatewayConfigImpl.java
index 71dd591..342628b 100644
--- a/gateway-server/src/main/java/org/apache/knox/gateway/config/impl/GatewayConfigImpl.java
+++ b/gateway-server/src/main/java/org/apache/knox/gateway/config/impl/GatewayConfigImpl.java
@@ -141,6 +141,7 @@
public static final String WEBSOCKET_INPUT_BUFFER_SIZE = GATEWAY_CONFIG_FILE_PREFIX + ".websocket.input.buffer.size";
public static final String WEBSOCKET_ASYNC_WRITE_TIMEOUT = GATEWAY_CONFIG_FILE_PREFIX + ".websocket.async.write.timeout";
public static final String WEBSOCKET_IDLE_TIMEOUT = GATEWAY_CONFIG_FILE_PREFIX + ".websocket.idle.timeout";
+ public static final String WEBSOCKET_MAX_WAIT_BUFFER_COUNT = GATEWAY_CONFIG_FILE_PREFIX + ".websocket.max.wait.buffer.count";
/**
* Properties for for gateway port mapping feature
@@ -190,6 +191,7 @@
public static final int DEFAULT_WEBSOCKET_INPUT_BUFFER_SIZE = 4096;
public static final int DEFAULT_WEBSOCKET_ASYNC_WRITE_TIMEOUT = 60000;
public static final int DEFAULT_WEBSOCKET_IDLE_TIMEOUT = 300000;
+ public static final int DEFAULT_WEBSOCKET_MAX_WAIT_BUFFER_COUNT = 100;
public static final boolean DEFAULT_GATEWAY_PORT_MAPPING_ENABLED = true;
public static final boolean DEFAULT_REMOTE_ALIAS_SERVICE_ENABLED = true;
@@ -851,6 +853,11 @@
}
@Override
+ public int getWebsocketMaxWaitBufferCount() {
+ return getInt( WEBSOCKET_MAX_WAIT_BUFFER_COUNT, DEFAULT_WEBSOCKET_MAX_WAIT_BUFFER_COUNT);
+ }
+
+ @Override
public Map<String, Integer> getGatewayPortMappings() {
final Map<String, Integer> result = new ConcurrentHashMap<>();
diff --git a/gateway-server/src/main/java/org/apache/knox/gateway/websockets/GatewayWebsocketHandler.java b/gateway-server/src/main/java/org/apache/knox/gateway/websockets/GatewayWebsocketHandler.java
index 0f19052..8fff20b 100644
--- a/gateway-server/src/main/java/org/apache/knox/gateway/websockets/GatewayWebsocketHandler.java
+++ b/gateway-server/src/main/java/org/apache/knox/gateway/websockets/GatewayWebsocketHandler.java
@@ -118,7 +118,8 @@
LOG.debugLog("Generated backend URL for websocket connection: " + backendURL);
/* Upgrade happens here */
- return new ProxyWebSocketAdapter(URI.create(backendURL), pool, getClientEndpointConfig(req));
+ return new ProxyWebSocketAdapter
+ (URI.create(backendURL), pool, getClientEndpointConfig(req), config);
} catch (final Exception e) {
LOG.failedCreatingWebSocket(e);
throw e;
diff --git a/gateway-server/src/main/java/org/apache/knox/gateway/websockets/ProxyWebSocketAdapter.java b/gateway-server/src/main/java/org/apache/knox/gateway/websockets/ProxyWebSocketAdapter.java
index 4c345cb..6364a01 100644
--- a/gateway-server/src/main/java/org/apache/knox/gateway/websockets/ProxyWebSocketAdapter.java
+++ b/gateway-server/src/main/java/org/apache/knox/gateway/websockets/ProxyWebSocketAdapter.java
@@ -19,7 +19,11 @@
import java.io.IOException;
import java.net.URI;
+import java.util.List;
+import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import javax.websocket.ClientEndpointConfig;
import javax.websocket.CloseReason;
@@ -28,6 +32,7 @@
import javax.websocket.WebSocketContainer;
import org.apache.knox.gateway.i18n.messages.MessagesFactory;
+import org.apache.knox.gateway.config.GatewayConfig;
import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.websocket.api.BatchMode;
@@ -57,21 +62,30 @@
private ExecutorService pool;
+ /* Message buffer for holding data frames temporarily in memory till connection is setup.
+ Keeping the max size of the buffer as 100 messages for now. */
+ private List<String> messageBuffer = new ArrayList<String>();
+ private Lock remoteLock = new ReentrantLock();
+
+ private final GatewayConfig config;
+
/**
* Used to transmit headers from browser to backend server.
* @since 0.14
*/
private ClientEndpointConfig clientConfig;
- public ProxyWebSocketAdapter(final URI backend, final ExecutorService pool) {
- this(backend, pool, null);
+ public ProxyWebSocketAdapter(final URI backend, final ExecutorService pool, GatewayConfig config) {
+ this(backend, pool, null, config);
}
- public ProxyWebSocketAdapter(final URI backend, final ExecutorService pool, final ClientEndpointConfig clientConfig) {
+ public ProxyWebSocketAdapter(final URI backend, final ExecutorService pool, final ClientEndpointConfig clientConfig,
+ GatewayConfig config) {
super();
this.backend = backend;
this.pool = pool;
this.clientConfig = clientConfig;
+ this.config = config;
}
@Override
@@ -104,9 +118,33 @@
throw new RuntimeIOException(e);
}
+ remoteLock.lock();
super.onWebSocketConnect(frontEndSession);
this.frontendSession = frontEndSession;
+ final RemoteEndpoint remote = frontEndSession.getRemote();
+ try {
+ if (!messageBuffer.isEmpty()) {
+ LOG.debugLog("Found old buffered messages");
+ for (String obj:messageBuffer) {
+ LOG.debugLog("Sending old buffered message [From Backend <---]: " + obj);
+ remote.sendString(obj);
+ }
+ messageBuffer.clear();
+ if (remote.getBatchMode() == BatchMode.ON) {
+ remote.flush();
+ }
+ } else {
+ LOG.debugLog("Message buffer is empty");
+ }
+ } catch (IOException e) {
+ LOG.connectionFailed(e);
+ throw new RuntimeIOException(e);
+ }
+ finally
+ {
+ remoteLock.unlock();
+ }
}
@Override
@@ -198,12 +236,29 @@
@Override
public void onMessageText(String message, Object session) {
- final RemoteEndpoint remote = getRemote();
-
LOG.logMessage("[From Backend <---]" + message);
-
- /* Proxy message to frontend */
+ remoteLock.lock();
+ final RemoteEndpoint remote = getRemote();
try {
+ if (remote == null) {
+ LOG.debugLog("Remote endpoint is null");
+ if (messageBuffer.size() >= config.getWebsocketMaxWaitBufferCount()) {
+ throw new RuntimeIOException("Remote is null and message buffer is full. Cannot buffer anymore ");
+ }
+ LOG.debugLog("Buffering message: " + message);
+ messageBuffer.add(message);
+ return;
+ }
+
+ /* Proxy message to frontend */
+ LOG.debugLog("Found old buffered messages");
+ for (String obj:messageBuffer) {
+ LOG.debugLog("Sending old buffered message [From Backend <---]: " + obj);
+ remote.sendString(obj);
+ }
+ messageBuffer.clear();
+
+ LOG.debugLog("Sending current message [From Backend <---]: " + message);
remote.sendString(message);
if (remote.getBatchMode() == BatchMode.ON) {
remote.flush();
@@ -212,7 +267,10 @@
LOG.connectionFailed(e);
throw new RuntimeIOException(e);
}
-
+ finally
+ {
+ remoteLock.unlock();
+ }
}
@Override
diff --git a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/BadBackendTest.java b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/BadBackendTest.java
index 6637813..194f172 100644
--- a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/BadBackendTest.java
+++ b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/BadBackendTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.knox.gateway.websockets;
+import org.apache.knox.gateway.config.GatewayConfig;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.ContextHandler;
@@ -25,6 +26,7 @@
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.easymock.EasyMock;
import javax.websocket.CloseReason;
import javax.websocket.ContainerProvider;
@@ -77,13 +79,14 @@
}
private static void startProxy() throws Exception {
+ GatewayConfig gatewayConfig = EasyMock.createNiceMock(GatewayConfig.class);
proxy = new Server();
proxyConnector = new ServerConnector(proxy);
proxy.addConnector(proxyConnector);
/* start Knox with WebsocketAdapter to test */
final BigEchoSocketHandler wsHandler = new BigEchoSocketHandler(
- new ProxyWebSocketAdapter(new URI(BAD_BACKEND), Executors.newFixedThreadPool(10)));
+ new ProxyWebSocketAdapter(new URI(BAD_BACKEND), Executors.newFixedThreadPool(10), gatewayConfig));
ContextHandler context = new ContextHandler();
context.setContextPath("/");
diff --git a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/ConnectionDroppedTest.java b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/ConnectionDroppedTest.java
index 80e3509..b2cf243 100644
--- a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/ConnectionDroppedTest.java
+++ b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/ConnectionDroppedTest.java
@@ -17,12 +17,14 @@
*/
package org.apache.knox.gateway.websockets;
+import org.apache.knox.gateway.config.GatewayConfig;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.easymock.EasyMock;
import javax.websocket.ContainerProvider;
import javax.websocket.WebSocketContainer;
@@ -112,13 +114,14 @@
}
private static void startProxy() throws Exception {
+ GatewayConfig gatewayConfig = EasyMock.createNiceMock(GatewayConfig.class);
proxy = new Server();
proxyConnector = new ServerConnector(proxy);
proxy.addConnector(proxyConnector);
/* start Knox with WebsocketAdapter to test */
final BigEchoSocketHandler wsHandler = new BigEchoSocketHandler(
- new ProxyWebSocketAdapter(serverUri, Executors.newFixedThreadPool(10)));
+ new ProxyWebSocketAdapter(serverUri, Executors.newFixedThreadPool(10), gatewayConfig));
ContextHandler context = new ContextHandler();
context.setContextPath("/");
diff --git a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/MessageFailureTest.java b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/MessageFailureTest.java
index 64b7f9a..e855f82 100644
--- a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/MessageFailureTest.java
+++ b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/MessageFailureTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.knox.gateway.websockets;
+import org.apache.knox.gateway.config.GatewayConfig;
import org.apache.commons.lang3.RandomStringUtils;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
@@ -26,6 +27,7 @@
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.easymock.EasyMock;
import javax.websocket.CloseReason;
import javax.websocket.ContainerProvider;
@@ -152,13 +154,14 @@
}
private static void startProxy() throws Exception {
+ GatewayConfig gatewayConfig = EasyMock.createNiceMock(GatewayConfig.class);
proxy = new Server();
proxyConnector = new ServerConnector(proxy);
proxy.addConnector(proxyConnector);
/* start Knox with WebsocketAdapter to test */
final BigEchoSocketHandler wsHandler = new BigEchoSocketHandler(
- new ProxyWebSocketAdapter(serverUri, Executors.newFixedThreadPool(10)));
+ new ProxyWebSocketAdapter(serverUri, Executors.newFixedThreadPool(10), gatewayConfig));
ContextHandler context = new ContextHandler();
context.setContextPath("/");
diff --git a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketEchoHTTPServiceRoleTest.java b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketEchoHTTPServiceRoleTest.java
index e66015a..717a454 100644
--- a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketEchoHTTPServiceRoleTest.java
+++ b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketEchoHTTPServiceRoleTest.java
@@ -59,6 +59,7 @@
@BeforeClass
public static void setUpBeforeClass() throws Exception {
+ handler = null;
WebsocketEchoTestBase.setUpBeforeClass();
WebsocketEchoTestBase.startServers("http");
}
diff --git a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketEchoTest.java b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketEchoTest.java
index 7a08b9e..1d3dbb8 100644
--- a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketEchoTest.java
+++ b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketEchoTest.java
@@ -21,10 +21,10 @@
import org.junit.BeforeClass;
import org.junit.Test;
+import java.net.URI;
import javax.websocket.ContainerProvider;
import javax.websocket.Session;
import javax.websocket.WebSocketContainer;
-import java.net.URI;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.CoreMatchers.is;
@@ -58,6 +58,7 @@
@BeforeClass
public static void setUpBeforeClass() throws Exception {
+ handler = null;
WebsocketEchoTestBase.setUpBeforeClass();
WebsocketEchoTestBase.startServers("ws");
}
diff --git a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketEchoTestBase.java b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketEchoTestBase.java
index a62d330..86a580a 100644
--- a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketEchoTestBase.java
+++ b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketEchoTestBase.java
@@ -36,6 +36,7 @@
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.server.handler.HandlerCollection;
+import org.eclipse.jetty.websocket.server.WebSocketHandler;
import java.io.File;
import java.io.IOException;
@@ -60,9 +61,7 @@
import static org.apache.knox.gateway.config.GatewayConfig.DEFAULT_IDENTITY_KEY_PASSPHRASE_ALIAS;
/**
- * Base class for tests that attempt to proxy websocket connections through Knox
- * gateway. It setups a websocket socket connection that simply echoes data back.
- *
+ * Base class for websocoket echo tests.
*/
public class WebsocketEchoTestBase {
private static final String TEST_KEY_ALIAS = "test-identity";
@@ -70,7 +69,7 @@
/**
* Simulate backend websocket
*/
- private static Server backendServer;
+ public static Server backendServer;
/**
* URI for backend websocket server
*/
@@ -93,6 +92,8 @@
*/
public static URI serverUri;
+ public static WebSocketHandler handler;
+
private static File topoDir;
private static Path dataDir;
private static Path securityDir;
@@ -142,7 +143,11 @@
ServerConnector connector = new ServerConnector(backendServer);
backendServer.addConnector(connector);
- final WebsocketEchoHandler handler = new WebsocketEchoHandler();
+ synchronized (WebsocketEchoTestBase.class) {
+ if (handler == null) {
+ handler = new WebsocketEchoHandler();
+ }
+ }
ContextHandler context = new ContextHandler();
context.setContextPath("/");
@@ -272,6 +277,9 @@
EasyMock.expect(gatewayConfig.getWebsocketIdleTimeout())
.andReturn(GatewayConfigImpl.DEFAULT_WEBSOCKET_IDLE_TIMEOUT).anyTimes();
+ EasyMock.expect(gatewayConfig.getWebsocketMaxWaitBufferCount())
+ .andReturn(GatewayConfigImpl.DEFAULT_WEBSOCKET_MAX_WAIT_BUFFER_COUNT).anyTimes();
+
EasyMock.expect(gatewayConfig.getRemoteRegistryConfigurationNames())
.andReturn(Collections.emptyList())
.anyTimes();
diff --git a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketMultipleConnectionTest.java b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketMultipleConnectionTest.java
index c69d5b9..a2a4ae7 100644
--- a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketMultipleConnectionTest.java
+++ b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketMultipleConnectionTest.java
@@ -113,7 +113,7 @@
/**
* Maximum number of open connections to test.
*/
- private static int MAX_CONNECTIONS = 100;
+ private static int MAX_CONNECTIONS = 99;
public WebsocketMultipleConnectionTest() {
super();
@@ -172,7 +172,7 @@
}
}
- latch.await(5 * MAX_CONNECTIONS, TimeUnit.MILLISECONDS);
+ latch.await(50 * MAX_CONNECTIONS, TimeUnit.MILLISECONDS);
/* 90 KB per connection */
/*
@@ -314,6 +314,9 @@
EasyMock.expect(gatewayConfig.getWebsocketIdleTimeout())
.andReturn(GatewayConfigImpl.DEFAULT_WEBSOCKET_IDLE_TIMEOUT).anyTimes();
+ EasyMock.expect(gatewayConfig.getWebsocketMaxWaitBufferCount())
+ .andReturn(GatewayConfigImpl.DEFAULT_WEBSOCKET_MAX_WAIT_BUFFER_COUNT).anyTimes();
+
EasyMock.expect(gatewayConfig.getRemoteRegistryConfigurationNames())
.andReturn(Collections.emptyList())
.anyTimes();
diff --git a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketServerInitiatedMessageTest.java b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketServerInitiatedMessageTest.java
new file mode 100644
index 0000000..26722df
--- /dev/null
+++ b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketServerInitiatedMessageTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.websockets;
+
+import org.eclipse.jetty.io.RuntimeIOException;
+import org.eclipse.jetty.websocket.api.BatchMode;
+import org.eclipse.jetty.websocket.api.RemoteEndpoint;
+import org.eclipse.jetty.websocket.api.WebSocketAdapter;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.server.WebSocketHandler;
+import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
+import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
+import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
+import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.websocket.ContainerProvider;
+import javax.websocket.WebSocketContainer;
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * A basic test that attempts to proxy websocket connections through Knox
+ * gateway.
+ * <p>
+ * The way the test is set up is as follows: <br>
+ * <ul>
+ * <li>A Mock Websocket server is setup which simply echos the responses sent by
+ * client.
+ * <li>Knox Gateway is set up with websocket handler
+ * {@link GatewayWebsocketHandler} that can proxy the requests.
+ * <li>Appropriate Topology and service definition files are set up with the
+ * address of the Websocket server.
+ * <li>A mock client is setup to connect to gateway.
+ * </ul>
+ *
+ * The test is to confirm whether the message is sent all the way to the backend
+ * Websocket server through Knox and back.
+ *
+ * @since 0.10
+ */
+public class WebsocketServerInitiatedMessageTest extends WebsocketEchoTestBase {
+
+ public WebsocketServerInitiatedMessageTest() {
+ super();
+ }
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ handler = new WebsocketServerInitiatedEchoHandler();
+ WebsocketEchoTestBase.setUpBeforeClass();
+ WebsocketEchoTestBase.startServers("ws");
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() {
+ WebsocketEchoTestBase.tearDownAfterClass();
+ }
+
+ /*
+ * Test websocket server initiated echo
+ */
+ @Test
+ public void testGatewayServerInitiatedEcho() throws Exception {
+ WebSocketContainer container = ContainerProvider.getWebSocketContainer();
+
+ WebsocketClient client = new WebsocketClient();
+ container.connectToServer(client,
+ new URI(serverUri.toString() + "gateway/websocket/123foo456bar/channels"));
+
+ //session.getBasicRemote().sendText("Echo");
+ client.messageQueue.awaitMessages(1, 5000, TimeUnit.MILLISECONDS);
+
+ assertThat(client.messageQueue.get(0), is("echo"));
+ }
+
+ /**
+ * A Mock websocket handler
+ *
+ */
+ private static class WebsocketServerInitiatedEchoHandler extends WebSocketHandler implements WebSocketCreator {
+ private final ServerInitiatingMessageSocket socket = new ServerInitiatingMessageSocket();
+
+ @Override
+ public void configure(WebSocketServletFactory factory) {
+ factory.getPolicy().setMaxTextMessageSize(2 * 1024 * 1024);
+ factory.setCreator(this);
+ }
+
+ @Override
+ public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse resp) {
+ return socket;
+ }
+ }
+
+ /**
+ * A simple socket initiating message on connect
+ */
+ private static class ServerInitiatingMessageSocket extends WebSocketAdapter {
+
+ @Override
+ public void onWebSocketError(Throwable cause) {
+ throw new RuntimeException(cause);
+ }
+
+ @Override
+ public void onWebSocketConnect(Session sess) {
+ super.onWebSocketConnect(sess);
+
+ try {
+ RemoteEndpoint remote = getRemote();
+ remote.sendString("echo", null);
+ if (remote.getBatchMode() == BatchMode.ON) {
+ remote.flush();
+ }
+ } catch (IOException x) {
+ throw new RuntimeIOException(x);
+ }
+ }
+ }
+}
diff --git a/gateway-spi/src/main/java/org/apache/knox/gateway/config/GatewayConfig.java b/gateway-spi/src/main/java/org/apache/knox/gateway/config/GatewayConfig.java
index 790e6b2..3e6b32a 100644
--- a/gateway-spi/src/main/java/org/apache/knox/gateway/config/GatewayConfig.java
+++ b/gateway-spi/src/main/java/org/apache/knox/gateway/config/GatewayConfig.java
@@ -379,6 +379,13 @@
*/
int getWebsocketIdleTimeout();
+ /**
+ * Max count of messages that can be temporarily buffered in memory before a connection is properly setup.
+ * @since 0.10
+ * @return buffer size
+ */
+ int getWebsocketMaxWaitBufferCount();
+
boolean isMetricsEnabled();
boolean isJmxMetricsReportingEnabled();
diff --git a/gateway-test-release-utils/src/main/java/org/apache/knox/gateway/GatewayTestConfig.java b/gateway-test-release-utils/src/main/java/org/apache/knox/gateway/GatewayTestConfig.java
index 2f3d588..4a2c754 100644
--- a/gateway-test-release-utils/src/main/java/org/apache/knox/gateway/GatewayTestConfig.java
+++ b/gateway-test-release-utils/src/main/java/org/apache/knox/gateway/GatewayTestConfig.java
@@ -47,6 +47,7 @@
public static final int DEFAULT_WEBSOCKET_INPUT_BUFFER_SIZE = 4096;
public static final int DEFAULT_WEBSOCKET_ASYNC_WRITE_TIMEOUT = 60000;
public static final int DEFAULT_WEBSOCKET_IDLE_TIMEOUT = 300000;
+ public static final int DEFAULT_WEBSOCKET_MAX_WAIT_BUFFER_COUNT = 100;
private Path gatewayHomePath = Paths.get("gateway-home");
private String hadoopConfDir = "hadoop";
@@ -545,6 +546,11 @@
}
@Override
+ public int getWebsocketMaxWaitBufferCount() {
+ return DEFAULT_WEBSOCKET_MAX_WAIT_BUFFER_COUNT;
+ }
+
+ @Override
public boolean isMetricsEnabled() {
return false;
}