blob: 36fd06548329d37aeb083df2949ca9122eae6262 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tomcat.bayeux;
import java.io.IOException;
import javax.servlet.ServletConfig;
import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletResponse;
import org.apache.catalina.comet.CometEvent;
import org.apache.catalina.comet.CometProcessor;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.apache.cometd.bayeux.Bayeux;
/**
*
* @author Guy Molinari
* @version 1.0
*/
public class BayeuxServlet implements CometProcessor {
/**
* Attribute to hold the TomcatBayeux object in the servlet context
*/
public static final String TOMCAT_BAYEUX_ATTR = Bayeux.DOJOX_COMETD_BAYEUX;
/**
* Logger object
*/
private static final Log log = LogFactory.getLog(BayeuxServlet.class);
/**
* Servlet config - for future use
*/
protected ServletConfig servletConfig;
/**
* Reference to the global TomcatBayeux object
*/
protected TomcatBayeux tb;
/**
* Upon servlet destruction, the servlet will clean up the
* TomcatBayeux object and terminate any outstanding events.
*/
public void destroy() {
servletConfig = null;
//to do, close all outstanding comet events
//tb.destroy();
tb = null;//TO DO, close everything down
}
/**
* Returns the preconfigured connection timeout.
* If no timeout has been configured as a servlet init parameter named <code>timeout</code>
* then the default of 2min will be used.
* @return int - the timeout for a connection in milliseconds
*/
protected int getTimeout() {
String timeoutS = servletConfig.getInitParameter("timeout");
int timeout = 120*1000; //2 min
try {
timeout = Integer.parseInt(timeoutS);
}catch (NumberFormatException nfe) {
//ignore, we have a default value
}
return timeout;
}
protected int getReconnectInterval() {
String rs = servletConfig.getInitParameter("reconnectInterval");
int rct = 1000; //1 seconds
try {
rct = Integer.parseInt(rs);
}catch (NumberFormatException nfe) {
//ignore, we have a default value
}
return rct;
}
public void event(CometEvent cometEvent) throws IOException, ServletException {
CometEvent.EventType type = cometEvent.getEventType();
if (log.isDebugEnabled()) {
log.debug("["+Thread.currentThread().getName()+"] Received Comet Event type="+type+" subtype:"+cometEvent.getEventSubType());
}
synchronized (cometEvent) {
if (type==CometEvent.EventType.BEGIN) {
//begin event, set the timeout
cometEvent.setTimeout(getTimeout());
//checkBayeux(cometEvent); - READ event should always come
} else if (type==CometEvent.EventType.READ) {
checkBayeux(cometEvent);
} else if (type==CometEvent.EventType.ERROR) {
tb.remove(cometEvent);
cometEvent.close();
} else if (type==CometEvent.EventType.END) {
tb.remove(cometEvent);
cometEvent.close();
}//end if
}//synchronized
}//event
/**
*
* @param cometEvent CometEvent
* @return boolean - true if we comet event stays open
* @throws IOException
* @throws UnsupportedOperationException
*/
protected void checkBayeux(CometEvent cometEvent) throws IOException, UnsupportedOperationException {
//we actually have data.
//data can be text/json or
if (Bayeux.JSON_CONTENT_TYPE.equals(cometEvent.getHttpServletRequest().getContentType())) {
//read and decode the bytes according to content length
log.warn("["+Thread.currentThread().getName()+"] JSON encoding not supported, will throw an exception and abort the request.");
int contentlength = cometEvent.getHttpServletRequest().getContentLength();
throw new UnsupportedOperationException("Decoding "+Bayeux.JSON_CONTENT_TYPE+" not yet implemented.");
} else { //GET method or application/x-www-form-urlencoded
String message = cometEvent.getHttpServletRequest().getParameter(Bayeux.MESSAGE_PARAMETER);
if (log.isTraceEnabled()) {
log.trace("["+Thread.currentThread().getName()+"] Received JSON message:"+message);
}
try {
int action = handleBayeux(message, cometEvent);
if (log.isDebugEnabled()) {
log.debug("["+Thread.currentThread().getName()+"] Bayeux handling complete, action result="+action);
}
if (action<=0) {
cometEvent.close();
}
}catch (Exception x) {
x.printStackTrace();
tb.remove(cometEvent);
log.error(x);
cometEvent.close();
}
}
}
protected int handleBayeux(String message, CometEvent event) throws IOException, ServletException {
int result = 0;
if (message==null || message.length()==0) return result;
try {
BayeuxRequest request = null;
//a message can be an array of messages
JSONArray jsArray = new JSONArray(message);
for (int i = 0; i < jsArray.length(); i++) {
JSONObject msg = jsArray.getJSONObject(i);
if (log.isDebugEnabled()) {
log.debug("["+Thread.currentThread().getName()+"] Processing bayeux message:"+msg);
}
request = RequestFactory.getRequest(tb,event,msg);
if (log.isDebugEnabled()) {
log.debug("["+Thread.currentThread().getName()+"] Processing bayeux message using request:"+request);
}
result = request.process(result);
if (log.isDebugEnabled()) {
log.debug("["+Thread.currentThread().getName()+"] Processing bayeux message result:"+result);
}
}
if (result>0 && request!=null) {
event.getHttpServletRequest().setAttribute(BayeuxRequest.LAST_REQ_ATTR, request);
ClientImpl ci = (ClientImpl)tb.getClient(((RequestBase)request).getClientId());
ci.addCometEvent(event);
if (log.isDebugEnabled()) {
log.debug("["+Thread.currentThread().getName()+"] Done bayeux message added to request attribute");
}
} else if (result == 0 && request!=null) {
RequestBase.deliver(event,(ClientImpl)tb.getClient(((RequestBase)request).getClientId()));
if (log.isDebugEnabled()) {
log.debug("["+Thread.currentThread().getName()+"] Done bayeux message, delivered to client");
}
}
}catch (JSONException x) {
log.error(x);//to do impl error handling
result = -1;
}catch (BayeuxException x) {
log.error(x); //to do impl error handling
result = -1;
}
return result;
}
public ServletConfig getServletConfig() {
return servletConfig;
}
public String getServletInfo() {
return "Tomcat/BayeuxServlet/1.0";
}
public void init(ServletConfig servletConfig) throws ServletException {
this.servletConfig = servletConfig;
ServletContext ctx = servletConfig.getServletContext();
if (ctx.getAttribute(TOMCAT_BAYEUX_ATTR)==null)
ctx.setAttribute(TOMCAT_BAYEUX_ATTR,new TomcatBayeux());
this.tb = (TomcatBayeux)ctx.getAttribute(TOMCAT_BAYEUX_ATTR);
tb.setReconnectInterval(getReconnectInterval());
}
public void service(ServletRequest servletRequest, ServletResponse servletResponse) throws ServletException, IOException {
if (servletResponse instanceof HttpServletResponse) {
( (HttpServletResponse) servletResponse).sendError(500, "Misconfigured Tomcat server, must be configured to support Comet operations.");
} else {
throw new ServletException("Misconfigured Tomcat server, must be configured to support Comet operations for the Bayeux protocol.");
}
}
}