blob: 1fc824d51b69ec1af90fe94cf3f7ca8b84419b88 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.catalina.comet;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import javax.net.SocketFactory;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.HttpSession;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import org.apache.catalina.Context;
import org.apache.catalina.Wrapper;
import org.apache.catalina.comet.CometEvent.EventType;
import org.apache.catalina.connector.CometEventImpl;
import org.apache.catalina.connector.Request;
import org.apache.catalina.connector.Response;
import org.apache.catalina.startup.Tomcat;
import org.apache.catalina.startup.TomcatBaseTest;
import org.apache.catalina.valves.TesterAccessLogValve;
import org.apache.catalina.valves.ValveBase;
public class TestCometProcessor extends TomcatBaseTest {
@Test
public void testAsyncClose() throws Exception {
Assume.assumeTrue(
"This test is skipped, because this connector does not support Comet.",
isCometSupported());
// Setup Tomcat instance
Tomcat tomcat = getTomcatInstance();
Context root = tomcat.addContext("", TEMP_DIR);
Tomcat.addServlet(root, "comet", new SimpleCometServlet());
root.addServletMapping("/comet", "comet");
Tomcat.addServlet(root, "hello", new HelloWorldServlet());
root.addServletMapping("/hello", "hello");
root.getPipeline().addValve(new AsyncCometCloseValve());
tomcat.getConnector().setProperty("connectionTimeout", "5000");
tomcat.start();
// Create connection to Comet servlet
final Socket socket =
SocketFactory.getDefault().createSocket("localhost", getPort());
socket.setSoTimeout(5000);
final OutputStream os = socket.getOutputStream();
String requestLine = "POST http://localhost:" + getPort() +
"/comet HTTP/1.1\r\n";
os.write(requestLine.getBytes());
os.write("transfer-encoding: chunked\r\n".getBytes());
os.write("\r\n".getBytes());
InputStream is = socket.getInputStream();
ResponseReaderThread readThread = new ResponseReaderThread(is);
readThread.start();
// Wait for the comet request/response to finish
int count = 0;
while (count < 10 && !readThread.getResponse().endsWith("0\r\n\r\n")) {
Thread.sleep(500);
count++;
}
if (count == 10) {
fail("Comet request did not complete");
}
// Send a standard HTTP request on the same connection
requestLine = "GET http://localhost:" + getPort() +
"/hello HTTP/1.1\r\n";
os.write(requestLine.getBytes());
os.write("\r\n".getBytes());
// Check for the expected response
count = 0;
while (count < 10 && !readThread.getResponse().contains(
HelloWorldServlet.RESPONSE_TEXT)) {
Thread.sleep(500);
count++;
}
if (count == 10) {
fail("Non-comet request did not complete");
}
readThread.join();
os.close();
is.close();
}
@Test
public void testSyncClose() throws Exception {
Assume.assumeTrue(
"This test is skipped, because this connector does not support Comet.",
isCometSupported());
// Setup Tomcat instance
Tomcat tomcat = getTomcatInstance();
Context root = tomcat.addContext("", TEMP_DIR);
Tomcat.addServlet(root, "comet", new CometCloseServlet());
root.addServletMapping("/comet", "comet");
Tomcat.addServlet(root, "hello", new HelloWorldServlet());
root.addServletMapping("/hello", "hello");
tomcat.getConnector().setProperty("connectionTimeout", "5000");
tomcat.start();
// Create connection to Comet servlet
final Socket socket =
SocketFactory.getDefault().createSocket("localhost", getPort());
socket.setSoTimeout(5000);
final OutputStream os = socket.getOutputStream();
String requestLine = "POST http://localhost:" + getPort() +
"/comet HTTP/1.1\r\n";
os.write(requestLine.getBytes());
os.write("transfer-encoding: chunked\r\n".getBytes());
os.write("\r\n".getBytes());
// Don't send any data
os.write("0\r\n\r\n".getBytes());
InputStream is = socket.getInputStream();
ResponseReaderThread readThread = new ResponseReaderThread(is);
readThread.start();
// Wait for the comet request/response to finish
int count = 0;
while (count < 10 && !readThread.getResponse().endsWith("0\r\n\r\n")) {
Thread.sleep(500);
count++;
}
Assert.assertTrue(readThread.getResponse().contains("2\r\nOK"));
if (count == 10) {
fail("Comet request did not complete");
}
// Send a standard HTTP request on the same connection
requestLine = "GET http://localhost:" + getPort() +
"/hello HTTP/1.1\r\n";
os.write(requestLine.getBytes());
os.write("connection: close\r\n".getBytes());
os.write("\r\n".getBytes());
// Check for the expected response
count = 0;
while (count < 10 && !readThread.getResponse().contains(
HelloWorldServlet.RESPONSE_TEXT)) {
Thread.sleep(500);
count++;
}
if (count == 10) {
fail("Non-comet request did not complete");
}
readThread.join();
os.close();
is.close();
}
@Test
public void testConnectionClose() throws Exception {
Assume.assumeTrue(
"This test is skipped, because this connector does not support Comet.",
isCometSupported());
// Setup Tomcat instance
Tomcat tomcat = getTomcatInstance();
Context root = tomcat.addContext("", TEMP_DIR);
Tomcat.addServlet(root, "comet", new ConnectionCloseServlet());
root.addServletMapping("/comet", "comet");
Tomcat.addServlet(root, "hello", new HelloWorldServlet());
root.addServletMapping("/hello", "hello");
tomcat.getConnector().setProperty("connectionTimeout", "5000");
tomcat.start();
// Create connection to Comet servlet
final Socket socket =
SocketFactory.getDefault().createSocket("localhost", getPort());
socket.setSoTimeout(5000);
final OutputStream os = socket.getOutputStream();
String requestLine = "POST http://localhost:" + getPort() +
"/comet HTTP/1.1\r\n";
os.write(requestLine.getBytes());
os.write("transfer-encoding: chunked\r\n".getBytes());
os.write("\r\n".getBytes());
// Don't send any data
os.write("0\r\n\r\n".getBytes());
InputStream is = socket.getInputStream();
ResponseReaderThread readThread = new ResponseReaderThread(is);
readThread.start();
// Wait for the comet request/response to finish
int count = 0;
while (count < 10 && !readThread.getResponse().endsWith("OK")) {
Thread.sleep(500);
count++;
}
if (count == 10) {
fail("Comet request did not complete");
}
// Read thread should have terminated cleanly when the server closed the
// socket
Assert.assertFalse(readThread.isAlive());
Assert.assertNull(readThread.getException());
os.close();
is.close();
}
@Test
public void testSimpleCometClient() throws Exception {
doSimpleCometTest(null);
}
@Test
public void testSimpleCometClientBeginFail() throws Exception {
doSimpleCometTest(SimpleCometServlet.FAIL_ON_BEGIN);
}
@Test
public void testSimpleCometClientReadFail() throws Exception {
doSimpleCometTest(SimpleCometServlet.FAIL_ON_READ);
}
@Test
public void testSimpleCometClientEndFail() throws Exception {
doSimpleCometTest(SimpleCometServlet.FAIL_ON_END);
}
private void doSimpleCometTest(String initParam) throws Exception {
Assume.assumeTrue(
"This test is skipped, because this connector does not support Comet.",
isCometSupported());
// Setup Tomcat instance
Tomcat tomcat = getTomcatInstance();
Context root = tomcat.addContext("", TEMP_DIR);
Wrapper w = Tomcat.addServlet(root, "comet", new SimpleCometServlet());
if (initParam != null) {
w.addInitParameter(initParam, "true");
}
root.addServletMapping("/", "comet");
TesterAccessLogValve alv = new TesterAccessLogValve();
root.getPipeline().addValve(alv);
tomcat.start();
// Create connection to Comet servlet
final Socket socket =
SocketFactory.getDefault().createSocket("localhost", getPort());
socket.setSoTimeout(60000);
final OutputStream os = socket.getOutputStream();
String requestLine = "POST http://localhost:" + getPort() +
"/ HTTP/1.1\r\n";
os.write(requestLine.getBytes());
os.write("transfer-encoding: chunked\r\n".getBytes());
os.write("\r\n".getBytes());
PingWriterThread writeThread = new PingWriterThread(4, os);
writeThread.start();
socket.setSoTimeout(25000);
InputStream is = socket.getInputStream();
ResponseReaderThread readThread = new ResponseReaderThread(is);
readThread.start();
readThread.join();
os.close();
is.close();
String[] response = readThread.getResponse().split("\r\n");
if (initParam == null) {
// Normal response expected
// Validate response
assertEquals("HTTP/1.1 200 OK", response[0]);
assertEquals("Server: Apache-Coyote/1.1", response[1]);
assertTrue(response[2].startsWith("Set-Cookie: JSESSIONID="));
assertEquals("Content-Type: text/plain;charset=ISO-8859-1", response[3]);
assertEquals("Transfer-Encoding: chunked", response[4]);
assertTrue(response[5].startsWith("Date: "));
assertEquals("", response[6]);
assertEquals("7", response[7]);
assertEquals("BEGIN", response[8]);
assertEquals("", response[9]);
assertEquals("17", response[10]);
assertEquals("Client: READ: 4 bytes", response[11]);
assertEquals("", response[12]);
assertEquals("17", response[13]);
assertEquals("Client: READ: 4 bytes", response[14]);
assertEquals("", response[15]);
assertEquals("17", response[16]);
assertEquals("Client: READ: 4 bytes", response[17]);
assertEquals("", response[18]);
assertEquals("17", response[19]);
assertEquals("Client: READ: 4 bytes", response[20]);
assertEquals("", response[21]);
assertEquals("d", response[22]);
assertEquals("Client: END", response[23]);
assertEquals("", response[24]);
assertEquals("0", response[25]);
// Expect 26 lines
assertEquals(26, response.length);
} else {
// Failure expected only expected for the fail on begin
// Failure at any later stage and the response headers (including
// the 200 response code will already have been sent to the client
if (SimpleCometServlet.FAIL_ON_BEGIN.equals(initParam)) {
assertEquals("500", getStatusCode(response[0]));
alv.validateAccessLog(1, 500, 0, 1000);
} else {
assertEquals("HTTP/1.1 200 OK", response[0]);
alv.validateAccessLog(1, 200, 0, 5000);
}
}
}
/*
* Tests if the Comet connection is closed if the Tomcat connector is
* stopped.
*/
@Test
public void testCometConnectorStop() throws Exception {
Assume.assumeTrue(
"This test is skipped, because this connector does not support Comet.",
isCometSupported());
// Setup Tomcat instance
SimpleCometServlet servlet = new SimpleCometServlet();
Tomcat tomcat = getTomcatInstance();
Context root = tomcat.addContext("", TEMP_DIR);
Tomcat.addServlet(root, "comet", servlet);
root.addServletMapping("/", "comet");
tomcat.start();
// Create connection to Comet servlet
final Socket socket =
SocketFactory.getDefault().createSocket("localhost", getPort());
socket.setSoTimeout(10000);
final OutputStream os = socket.getOutputStream();
String requestLine = "POST http://localhost:" + getPort() +
"/ HTTP/1.1\r\n";
os.write(requestLine.getBytes());
os.write("transfer-encoding: chunked\r\n".getBytes());
os.write("\r\n".getBytes());
PingWriterThread writeThread = new PingWriterThread(100, os);
writeThread.start();
InputStream is = socket.getInputStream();
ResponseReaderThread readThread = new ResponseReaderThread(is);
readThread.start();
// Allow the first couple of PING messages to be written
Thread.sleep(3000);
tomcat.getConnector().stop();
// Wait for the read and write threads to stop
readThread.join(5000);
writeThread.join(5000);
// Destroy the connector once the executor has sent the end event
tomcat.getConnector().destroy();
String[] response = readThread.getResponse().split("\r\n");
String lastMessage = "";
String lastResponseLine = "";
for (int i = response.length; --i >= 0;) {
lastMessage = response[i];
if (lastMessage.startsWith("Client:")) {
break;
}
}
for (int i = response.length; --i >= 0;) {
lastResponseLine = response[i];
if (lastResponseLine.length() > 0) {
break;
}
}
StringBuilder status = new StringBuilder();
// Expected, but is not 100% reliable:
// WriteThread exception: java.net.SocketException
// ReaderThread exception: null
// Last message: [Client: END]
// Last response line: [0] (empty chunk)
// Last comet event: [END]
// END event occurred: [true]
status.append("Status:");
status.append("\nWriterThread exception: " + writeThread.getException());
status.append("\nReaderThread exception: " + readThread.getException());
status.append("\nLast message: [" + lastMessage + "]");
status.append("\nLast response line: [" + lastResponseLine + "]");
status.append("\nLast comet event: [" + servlet.getLastEvent() + "]");
status.append("\nEND event occurred: [" + servlet.getEndEventOccurred() + "]");
if (writeThread.getException() == null
|| !lastMessage.contains("Client: END")
|| !EventType.END.equals(servlet.getLastEvent())) {
log.error(status);
} else {
log.info(status);
}
assertTrue("Comet END event not received", servlet.getEndEventOccurred());
assertTrue("Comet END event not last event received",
EventType.END.equals(servlet.getLastEvent()));
}
private boolean isCometSupported() {
String protocol =
getTomcatInstance().getConnector().getProtocolHandlerClassName();
return (protocol.contains("Nio") || protocol.contains("Apr"));
}
private static class SimpleCometServlet extends HttpServlet
implements CometProcessor {
private static final long serialVersionUID = 1L;
public static final String FAIL_ON_BEGIN = "failOnBegin";
public static final String FAIL_ON_READ = "failOnRead";
public static final String FAIL_ON_END = "failOnEnd";
private boolean failOnBegin = false;
private boolean failOnRead = false;
private boolean failOnEnd = false;
private volatile EventType lastEvent;
private volatile boolean endEventOccurred = false;
public EventType getLastEvent() {
return lastEvent;
}
public boolean getEndEventOccurred() {
return endEventOccurred;
}
@Override
public void init() throws ServletException {
failOnBegin = Boolean.valueOf(getServletConfig().getInitParameter(
FAIL_ON_BEGIN)).booleanValue();
failOnRead = Boolean.valueOf(getServletConfig().getInitParameter(
FAIL_ON_READ)).booleanValue();
failOnEnd = Boolean.valueOf(getServletConfig().getInitParameter(
FAIL_ON_END)).booleanValue();
}
@Override
public void event(CometEvent event) throws IOException,
ServletException {
HttpServletRequest request = event.getHttpServletRequest();
HttpServletResponse response = event.getHttpServletResponse();
HttpSession session = request.getSession(true);
session.setMaxInactiveInterval(30);
lastEvent = event.getEventType();
if (event.getEventType() == EventType.BEGIN) {
if (failOnBegin) {
throw new IOException("Fail on begin");
}
response.setContentType("text/plain");
response.getWriter().print("BEGIN" + "\r\n");
} else if (event.getEventType() == EventType.READ) {
if (failOnRead) {
throw new IOException("Fail on read");
}
InputStream is = request.getInputStream();
int count = 0;
while (is.available() > 0) {
is.read();
count ++;
}
String msg = "READ: " + count + " bytes";
response.getWriter().print("Client: " + msg + "\r\n");
} else if (event.getEventType() == EventType.END) {
endEventOccurred = true;
if (failOnEnd) {
throw new IOException("Fail on end");
}
String msg = "END";
response.getWriter().print("Client: " + msg + "\r\n");
event.close();
} else {
String msg = event.getEventType() + ":" + event.getEventSubType() + "\r\n";
System.out.print(msg);
response.getWriter().print(msg);
event.close();
}
response.getWriter().flush();
}
}
private static class CometCloseServlet extends HttpServlet
implements CometProcessor {
private static final long serialVersionUID = 1L;
@Override
public void event(CometEvent event) throws IOException,
ServletException {
HttpServletResponse response = event.getHttpServletResponse();
response.setContentType("text/plain");
// Force a chunked response since that is what the test client
// expects
response.flushBuffer();
response.getWriter().print("OK");
event.close();
}
}
private static class ConnectionCloseServlet extends HttpServlet
implements CometProcessor {
private static final long serialVersionUID = 1L;
@Override
public void event(CometEvent event) throws IOException,
ServletException {
HttpServletResponse response = event.getHttpServletResponse();
response.setContentType("text/plain");
// Disable keep-alive
response.setHeader("Connection", "close");
response.flushBuffer();
response.getWriter().print("OK");
event.close();
}
}
private static class PingWriterThread extends Thread {
private final int pingCount;
private final OutputStream os;
private volatile Exception e = null;
public PingWriterThread(int pingCount, OutputStream os) {
this.pingCount = pingCount;
this.os = os;
}
public Exception getException() {
return e;
}
@Override
public void run() {
try {
for (int i = 0; i < pingCount; i++) {
os.write("4\r\n".getBytes());
os.write("PING\r\n".getBytes());
os.flush();
Thread.sleep(1000);
}
os.write("0\r\n".getBytes());
os.write("\r\n".getBytes());
} catch (Exception e) {
this.e = e;
}
}
}
private static class ResponseReaderThread extends Thread {
private final InputStream is;
private StringBuilder response = new StringBuilder();
private volatile Exception e = null;
public ResponseReaderThread(InputStream is) {
this.is = is;
}
public Exception getException() {
return e;
}
public String getResponse() {
return response.toString();
}
@Override
public void run() {
try {
int c = is.read();
while (c > -1) {
response.append((char) c);
c = is.read();
}
} catch (Exception e) {
this.e = e;
}
}
}
private static class AsyncCometCloseValve extends ValveBase {
@Override
public void invoke(Request request, Response response)
throws IOException, ServletException {
CometEventImpl event = new CometEventImpl(request, response);
getNext().invoke(request, response);
if (request.isComet()) {
Thread t = new AsyncCometCloseThread(event);
t.start();
}
}
}
private static class AsyncCometCloseThread extends Thread {
private final CometEvent event;
public AsyncCometCloseThread(CometEvent event) {
this.event = event;
}
@Override
public void run() {
try {
Thread.sleep(2000);
event.close();
} catch (Exception e) {
// Test should fail. Report what went wrong.
e.printStackTrace();
}
}
}
}