WICKET-6791 Offload WebSocket push when initiated in Wicket request cycle (#435)

Use different thread executors when pushing WebSocketPushMessage in http worker thread and non-worker thread
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java
index b755c18..6b47820 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java
@@ -39,6 +39,9 @@
 import org.slf4j.LoggerFactory;
 
 import javax.servlet.http.HttpServletRequest;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
@@ -63,6 +66,7 @@
 	 * are never passed to the Servlet Filters.
 	 */
 	private static boolean USING_JAVAX_WEB_SOCKET = false;
+
 	static
 	{
 		try
@@ -95,6 +99,7 @@
 			{
 				synchronized (application)
 				{
+					settings = application.getMetaData(KEY);
 					if (settings == null)
 					{
 						settings = new WebSocketSettings();
@@ -114,7 +119,7 @@
 	/**
 	 * The executor that handles the processing of Web Socket push message broadcasts.
 	 */
-	private Executor webSocketPushMessageExecutor = new SameThreadExecutor();
+	private Executor webSocketPushMessageExecutor = new WebSocketPushMessageExecutor();
 
 	/**
 	 * The executor that handles broadcast of the {@link org.apache.wicket.protocol.ws.api.event.WebSocketPayload}
@@ -237,11 +242,12 @@
 	 */
 	public Executor getSendPayloadExecutor()
 	{
-	return sendPayloadExecutor;
+		return sendPayloadExecutor;
 	}
 
 	/**
 	 * Sets the filter for checking the incoming connections
+	 *
 	 * @param connectionFilter
 	 *              the filter for checking the incoming connections
 	 * @see WebSocketConnectionFilterCollection
@@ -255,7 +261,8 @@
 	 * @return the filter for checking the incoming connections
 	 * @see WebSocketConnectionFilterCollection
 	 */
-	public IWebSocketConnectionFilter getConnectionFilter() {
+	public IWebSocketConnectionFilter getConnectionFilter()
+	{
 		return this.connectionFilter;
 	}
 
@@ -283,7 +290,8 @@
 	 *          The active web socket connection
 	 * @return a new instance of WebSocketRequestHandler for processing a web socket request
 	 */
-	public WebSocketRequestHandler newWebSocketRequestHandler(Page page, IWebSocketConnection connection) {
+	public WebSocketRequestHandler newWebSocketRequestHandler(Page page, IWebSocketConnection connection)
+	{
 		return new WebSocketRequestHandler(page, connection);
 	}
 
@@ -302,11 +310,13 @@
 		return new WebSocketRequest(new ServletRequestCopy(request), filterPath);
 	}
 
-	public void setFilterPrefix(final CharSequence filterPrefix) {
+	public void setFilterPrefix(final CharSequence filterPrefix)
+	{
 		this.filterPrefix.set(filterPrefix);
 	}
 
-	public CharSequence getFilterPrefix() {
+	public CharSequence getFilterPrefix()
+	{
 		if (filterPrefix.get() == null)
 		{
 			if (USING_JAVAX_WEB_SOCKET)
@@ -321,11 +331,13 @@
 		return filterPrefix.get();
 	}
 
-	public void setContextPath(final CharSequence contextPath) {
+	public void setContextPath(final CharSequence contextPath)
+	{
 		this.contextPath.set(contextPath);
 	}
 
-	public CharSequence getContextPath() {
+	public CharSequence getContextPath()
+	{
 		contextPath.compareAndSet(null, RequestCycle.get().getRequest().getContextPath());
 		return contextPath.get();
 	}
@@ -335,7 +347,8 @@
 		this.baseUrl.set(baseUrl);
 	}
 
-	public CharSequence getBaseUrl() {
+	public CharSequence getBaseUrl()
+	{
 		if (baseUrl.get() == null)
 		{
 			Url _baseUrl = RequestCycle.get().getUrlRenderer().getBaseUrl();
@@ -393,4 +406,66 @@
 			command.run();
 		}
 	}
+
+	public static class WebSocketPushMessageExecutor implements Executor
+	{
+		/**
+		 * An executor that should be used when the WebSocket message is pushed
+		 * from non-http worker thread.
+		 */
+		private final java.util.concurrent.Executor nonHttpRequestExecutor;
+
+		/**
+		 * An executor that is used when the WebSocket push is initiated in
+		 * http worker thread. In this case the WebSocket processing should be
+		 * off-loaded to a different thread that should wait for the page instance
+		 * lock.
+		 */
+		private final java.util.concurrent.Executor httpRequestExecutor;
+
+		/**
+		 * For non-http worker threads pushes the WebSocket runnable in the same request.
+		 * For http worker threads uses an elastic thread pool of 1-8 threads.
+		 *
+		 * Use {@link WebSocketPushMessageExecutor#WebSocketPushMessageExecutor(java.util.concurrent.Executor, java.util.concurrent.Executor)}
+		 * for custom behavior and/or settings
+		 */
+		public WebSocketPushMessageExecutor()
+		{
+			this(Runnable::run, new ThreadPoolExecutor(1, 8,
+			                                           60L, TimeUnit.SECONDS,
+			                                           new SynchronousQueue<>(),
+			                                           new ThreadFactory()));
+		}
+
+		public WebSocketPushMessageExecutor(java.util.concurrent.Executor nonHttpRequestExecutor, java.util.concurrent.Executor httpRequestExecutor)
+		{
+			this.nonHttpRequestExecutor = nonHttpRequestExecutor;
+			this.httpRequestExecutor = httpRequestExecutor;
+		}
+
+		@Override
+		public void run(final Runnable command)
+		{
+			if (RequestCycle.get() != null)
+			{
+				httpRequestExecutor.execute(command);
+			}
+			else
+			{
+				nonHttpRequestExecutor.execute(command);
+			}
+		}
+	}
+
+	public static class ThreadFactory implements java.util.concurrent.ThreadFactory
+	{
+		private final AtomicInteger counter = new AtomicInteger();
+
+		@Override
+		public Thread newThread(final Runnable r)
+		{
+			return new Thread(r, "Wicket-WebSocket-HttpRequest-Thread-" + counter.getAndIncrement());
+		}
+	}
 }
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/WebSocketTester.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/WebSocketTester.java
index f83894f..df67eac 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/WebSocketTester.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/WebSocketTester.java
@@ -56,6 +56,9 @@
 			webApplication.getWicketFilter().setFilterPath("");
 		}
 
+		WebSocketSettings webSocketSettings = WebSocketSettings.Holder.get(webApplication);
+		webSocketSettings.setWebSocketPushMessageExecutor(new WebSocketSettings.SameThreadExecutor());
+
 		socketProcessor = new TestWebSocketProcessor(wicketTester, page)
 		{
 			@Override
@@ -93,6 +96,9 @@
 			webApplication.getWicketFilter().setFilterPath("");
 		}
 
+		WebSocketSettings webSocketSettings = WebSocketSettings.Holder.get(webApplication);
+		webSocketSettings.setWebSocketPushMessageExecutor(new WebSocketSettings.SameThreadExecutor());
+
 		socketProcessor = new TestWebSocketProcessor(wicketTester, resourceName)
 		{
 			@Override