| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.activemq.web; |
| |
| import java.io.IOException; |
| import java.io.PrintWriter; |
| import java.io.StringWriter; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Timer; |
| import java.util.TimerTask; |
| |
| import javax.jms.Destination; |
| import javax.jms.JMSException; |
| import javax.jms.Message; |
| import javax.jms.MessageConsumer; |
| import javax.jms.ObjectMessage; |
| import javax.jms.TextMessage; |
| import javax.servlet.ServletConfig; |
| import javax.servlet.ServletException; |
| import javax.servlet.http.HttpServletRequest; |
| import javax.servlet.http.HttpServletResponse; |
| import javax.servlet.http.HttpSession; |
| |
| import org.apache.activemq.MessageAvailableConsumer; |
| import org.eclipse.jetty.continuation.Continuation; |
| import org.eclipse.jetty.continuation.ContinuationListener; |
| import org.eclipse.jetty.continuation.ContinuationSupport; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * A servlet for sending and receiving messages to/from JMS destinations using |
| * HTTP POST for sending and HTTP GET for receiving. <p/> You can specify the |
| * destination and whether it is a topic or queue via configuration details on |
| * the servlet or as request parameters. <p/> For reading messages you can |
| * specify a readTimeout parameter to determine how long the servlet should |
| * block for. The servlet can be configured with the following init parameters: |
| * <dl> |
| * <dt>defaultReadTimeout</dt> |
| * <dd>The default time in ms to wait for messages. May be overridden by a |
| * request using the 'timeout' parameter</dd> |
| * <dt>maximumReadTimeout</dt> |
| * <dd>The maximum value a request may specify for the 'timeout' parameter</dd> |
| * <dt>maximumMessages</dt> |
| * <dd>maximum messages to send per response</dd> |
| * <dt></dt> |
| * <dd></dd> |
| * </dl> |
| * |
| * |
| */ |
| @SuppressWarnings("serial") |
| public class MessageListenerServlet extends MessageServletSupport { |
| private static final Logger LOG = LoggerFactory.getLogger(MessageListenerServlet.class); |
| |
| private final String readTimeoutParameter = "timeout"; |
| private long defaultReadTimeout = -1; |
| private long maximumReadTimeout = 25000; |
| private int maximumMessages = 100; |
| private final Timer clientCleanupTimer = new Timer("ActiveMQ Ajax Client Cleanup Timer", true); |
| private final HashMap<String,AjaxWebClient> ajaxWebClients = new HashMap<String,AjaxWebClient>(); |
| |
| @Override |
| public void init() throws ServletException { |
| ServletConfig servletConfig = getServletConfig(); |
| String name = servletConfig.getInitParameter("defaultReadTimeout"); |
| if (name != null) { |
| defaultReadTimeout = asLong(name); |
| } |
| name = servletConfig.getInitParameter("maximumReadTimeout"); |
| if (name != null) { |
| maximumReadTimeout = asLong(name); |
| } |
| name = servletConfig.getInitParameter("maximumMessages"); |
| if (name != null) { |
| maximumMessages = (int)asLong(name); |
| } |
| clientCleanupTimer.schedule( new ClientCleaner(), 5000, 60000 ); |
| } |
| |
| /** |
| * Sends a message to a destination or manage subscriptions. If the the |
| * content type of the POST is |
| * <code>application/x-www-form-urlencoded</code>, then the form |
| * parameters "destination", "message" and "type" are used to pass a message |
| * or a subscription. If multiple messages or subscriptions are passed in a |
| * single post, then additional parameters are shortened to "dN", "mN" and |
| * "tN" where N is an index starting from 1. The type is either "send", |
| * "listen" or "unlisten". For send types, the message is the text of the |
| * TextMessage, otherwise it is the ID to be used for the subscription. If |
| * the content type is not <code>application/x-www-form-urlencoded</code>, |
| * then the body of the post is sent as the message to a destination that is |
| * derived from a query parameter, the URL or the default destination. |
| * |
| * @param request |
| * @param response |
| * @throws ServletException |
| * @throws IOException |
| */ |
| @Override |
| protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { |
| |
| // lets turn the HTTP post into a JMS Message |
| AjaxWebClient client = getAjaxWebClient( request ); |
| String messageIds = ""; |
| |
| synchronized (client) { |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("POST client=" + client + " session=" + request.getSession().getId() + " clientId="+ request.getParameter("clientId") + " info=" + request.getPathInfo() + " contentType=" + request.getContentType()); |
| // dump(request.getParameterMap()); |
| } |
| |
| int messages = 0; |
| |
| // loop until no more messages |
| while (true) { |
| // Get the message parameters. Multiple messages are encoded |
| // with more compact parameter names. |
| String destinationName = request.getParameter(messages == 0 ? "destination" : ("d" + messages)); |
| |
| if (destinationName == null) { |
| destinationName = request.getHeader("destination"); |
| } |
| |
| String message = request.getParameter(messages == 0 ? "message" : ("m" + messages)); |
| String type = request.getParameter(messages == 0 ? "type" : ("t" + messages)); |
| |
| if (destinationName == null || message == null || type == null) { |
| break; |
| } |
| |
| try { |
| Destination destination = getDestination(client, request, destinationName); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(messages + " destination=" + destinationName + " message=" + message + " type=" + type); |
| LOG.debug(destination + " is a " + destination.getClass().getName()); |
| } |
| |
| messages++; |
| |
| if ("listen".equals(type)) { |
| AjaxListener listener = client.getListener(); |
| Map<MessageAvailableConsumer, String> consumerIdMap = client.getIdMap(); |
| Map<MessageAvailableConsumer, String> consumerDestinationNameMap = client.getDestinationNameMap(); |
| client.closeConsumer(destination); // drop any existing |
| // consumer. |
| MessageAvailableConsumer consumer = (MessageAvailableConsumer)client.getConsumer(destination, request.getHeader(WebClient.selectorName)); |
| |
| consumer.setAvailableListener(listener); |
| consumerIdMap.put(consumer, message); |
| consumerDestinationNameMap.put(consumer, destinationName); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Subscribed: " + consumer + " to " + destination + " id=" + message); |
| } |
| } else if ("unlisten".equals(type)) { |
| Map<MessageAvailableConsumer, String> consumerIdMap = client.getIdMap(); |
| Map<MessageAvailableConsumer, String> consumerDestinationNameMap = client.getDestinationNameMap(); |
| MessageAvailableConsumer consumer = (MessageAvailableConsumer)client.getConsumer(destination, request.getHeader(WebClient.selectorName)); |
| |
| consumer.setAvailableListener(null); |
| consumerIdMap.remove(consumer); |
| consumerDestinationNameMap.remove(consumer); |
| client.closeConsumer(destination); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Unsubscribed: " + consumer); |
| } |
| } else if ("send".equals(type)) { |
| TextMessage text = client.getSession().createTextMessage(message); |
| appendParametersToMessage(request, text); |
| |
| client.send(destination, text); |
| messageIds += text.getJMSMessageID() + "\n"; |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Sent " + message + " to " + destination); |
| } |
| } else { |
| LOG.warn("unknown type " + type); |
| } |
| |
| } catch (JMSException e) { |
| LOG.warn("jms", e); |
| } |
| } |
| } |
| |
| if ("true".equals(request.getParameter("poll"))) { |
| try { |
| // TODO return message IDs |
| doMessages(client, request, response); |
| } catch (JMSException e) { |
| throw new ServletException("JMS problem: " + e, e); |
| } |
| } else { |
| // handle simple POST of a message |
| if (request.getContentLength() != 0 && (request.getContentType() == null || !request.getContentType().toLowerCase().startsWith("application/x-www-form-urlencoded"))) { |
| try { |
| Destination destination = getDestination(client, request); |
| String body = getPostedMessageBody(request); |
| TextMessage message = client.getSession().createTextMessage(body); |
| appendParametersToMessage(request, message); |
| |
| client.send(destination, message); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Sent to destination: " + destination + " body: " + body); |
| } |
| messageIds += message.getJMSMessageID() + "\n"; |
| } catch (JMSException e) { |
| throw new ServletException(e); |
| } |
| } |
| |
| response.setContentType("text/plain"); |
| response.setHeader("Cache-Control", "no-cache"); |
| response.getWriter().print(messageIds); |
| } |
| } |
| |
| /** |
| * Supports a HTTP DELETE to be equivlanent of consuming a singe message |
| * from a queue |
| */ |
| @Override |
| protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { |
| try { |
| AjaxWebClient client = getAjaxWebClient(request); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("GET client=" + client + " session=" + request.getSession().getId() + " clientId="+ request.getParameter("clientId") + " uri=" + request.getRequestURI() + " query=" + request.getQueryString()); |
| } |
| |
| doMessages(client, request, response); |
| } catch (JMSException e) { |
| throw new ServletException("JMS problem: " + e, e); |
| } |
| } |
| |
| /** |
| * Reads a message from a destination up to some specific timeout period |
| * |
| * @param client The webclient |
| * @param request |
| * @param response |
| * @throws ServletException |
| * @throws IOException |
| */ |
| protected void doMessages(AjaxWebClient client, HttpServletRequest request, HttpServletResponse response) throws JMSException, IOException { |
| |
| int messages = 0; |
| // This is a poll for any messages |
| |
| long timeout = getReadTimeout(request); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("doMessage timeout=" + timeout); |
| } |
| |
| // this is non-null if we're resuming the continuation. |
| // attributes set in AjaxListener |
| UndeliveredAjaxMessage undelivered_message = null; |
| Message message = null; |
| undelivered_message = (UndeliveredAjaxMessage)request.getAttribute("undelivered_message"); |
| if( undelivered_message != null ) { |
| message = undelivered_message.getMessage(); |
| } |
| |
| synchronized (client) { |
| |
| List<MessageConsumer> consumers = client.getConsumers(); |
| MessageAvailableConsumer consumer = null; |
| if( undelivered_message != null ) { |
| consumer = (MessageAvailableConsumer)undelivered_message.getConsumer(); |
| } |
| |
| if (message == null) { |
| // Look for a message that is ready to go |
| for (int i = 0; message == null && i < consumers.size(); i++) { |
| consumer = (MessageAvailableConsumer)consumers.get(i); |
| if (consumer.getAvailableListener() == null) { |
| continue; |
| } |
| |
| // Look for any available messages |
| message = consumer.receive(10); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("received " + message + " from " + consumer); |
| } |
| } |
| } |
| |
| // prepare the response |
| response.setContentType("text/xml"); |
| response.setHeader("Cache-Control", "no-cache"); |
| |
| if (message == null && client.getListener().getUndeliveredMessages().size() == 0) { |
| Continuation continuation = ContinuationSupport.getContinuation(request); |
| |
| // Add a listener to the continuation to make sure it actually |
| // will expire (seems like a bug in Jetty Servlet 3 continuations, |
| // see https://issues.apache.org/jira/browse/AMQ-3447 |
| continuation.addContinuationListener(new ContinuationListener() { |
| @Override |
| public void onTimeout(Continuation cont) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Continuation " + cont.toString() + " expired."); |
| } |
| } |
| |
| @Override |
| public void onComplete(Continuation cont) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Continuation " + cont.toString() + " completed."); |
| } |
| } |
| }); |
| |
| if (continuation.isExpired()) { |
| response.setStatus(HttpServletResponse.SC_OK); |
| StringWriter swriter = new StringWriter(); |
| PrintWriter writer = new PrintWriter(swriter); |
| writer.println("<ajax-response>"); |
| writer.print("</ajax-response>"); |
| |
| writer.flush(); |
| String m = swriter.toString(); |
| response.getWriter().println(m); |
| |
| return; |
| } |
| |
| continuation.setTimeout(timeout); |
| continuation.suspend(); |
| LOG.debug( "Suspending continuation " + continuation ); |
| |
| // Fetch the listeners |
| AjaxListener listener = client.getListener(); |
| listener.access(); |
| |
| // register this continuation with our listener. |
| listener.setContinuation(continuation); |
| |
| return; |
| } |
| |
| StringWriter swriter = new StringWriter(); |
| PrintWriter writer = new PrintWriter(swriter); |
| |
| Map<MessageAvailableConsumer, String> consumerIdMap = client.getIdMap(); |
| Map<MessageAvailableConsumer, String> consumerDestinationNameMap = client.getDestinationNameMap(); |
| response.setStatus(HttpServletResponse.SC_OK); |
| writer.println("<ajax-response>"); |
| |
| // Send any message we already have |
| if (message != null) { |
| String id = consumerIdMap.get(consumer); |
| String destinationName = consumerDestinationNameMap.get(consumer); |
| LOG.debug( "sending pre-existing message" ); |
| writeMessageResponse(writer, message, id, destinationName); |
| |
| messages++; |
| } |
| |
| // send messages buffered while continuation was unavailable. |
| LinkedList<UndeliveredAjaxMessage> undeliveredMessages = ((AjaxListener)consumer.getAvailableListener()).getUndeliveredMessages(); |
| LOG.debug("Send " + undeliveredMessages.size() + " unconsumed messages"); |
| synchronized( undeliveredMessages ) { |
| for (Iterator<UndeliveredAjaxMessage> it = undeliveredMessages.iterator(); it.hasNext();) { |
| messages++; |
| UndeliveredAjaxMessage undelivered = it.next(); |
| Message msg = undelivered.getMessage(); |
| consumer = (MessageAvailableConsumer)undelivered.getConsumer(); |
| String id = consumerIdMap.get(consumer); |
| String destinationName = consumerDestinationNameMap.get(consumer); |
| LOG.debug( "sending undelivered/buffered messages" ); |
| LOG.debug( "msg:" +msg+ ", id:" +id+ ", destinationName:" +destinationName); |
| writeMessageResponse(writer, msg, id, destinationName); |
| it.remove(); |
| if (messages >= maximumMessages) { |
| break; |
| } |
| } |
| } |
| |
| // Send the rest of the messages |
| for (int i = 0; i < consumers.size() && messages < maximumMessages; i++) { |
| consumer = (MessageAvailableConsumer)consumers.get(i); |
| if (consumer.getAvailableListener() == null) { |
| continue; |
| } |
| |
| // Look for any available messages |
| while (messages < maximumMessages) { |
| message = consumer.receiveNoWait(); |
| if (message == null) { |
| break; |
| } |
| messages++; |
| String id = consumerIdMap.get(consumer); |
| String destinationName = consumerDestinationNameMap.get(consumer); |
| LOG.debug( "sending final available messages" ); |
| writeMessageResponse(writer, message, id, destinationName); |
| } |
| } |
| |
| writer.print("</ajax-response>"); |
| |
| writer.flush(); |
| String m = swriter.toString(); |
| response.getWriter().println(m); |
| } |
| } |
| |
| protected void writeMessageResponse(PrintWriter writer, Message message, String id, String destinationName) throws JMSException, IOException { |
| writer.print("<response id='"); |
| writer.print(id); |
| writer.print("'"); |
| if (destinationName != null) { |
| writer.print(" destination='" + destinationName + "' "); |
| } |
| writer.print(">"); |
| if (message instanceof TextMessage) { |
| TextMessage textMsg = (TextMessage)message; |
| String txt = textMsg.getText(); |
| if (txt != null) { |
| if (txt.startsWith("<?")) { |
| txt = txt.substring(txt.indexOf("?>") + 2); |
| } |
| writer.print(txt); |
| } |
| } else if (message instanceof ObjectMessage) { |
| ObjectMessage objectMsg = (ObjectMessage)message; |
| Object object = objectMsg.getObject(); |
| if (object != null) { |
| writer.print(object.toString()); |
| } |
| } |
| writer.println("</response>"); |
| } |
| |
| /* |
| * Return the AjaxWebClient for this session+clientId. |
| * Create one if it does not already exist. |
| */ |
| protected AjaxWebClient getAjaxWebClient( HttpServletRequest request ) { |
| HttpSession session = request.getSession(true); |
| |
| String clientId = request.getParameter( "clientId" ); |
| // if user doesn't supply a 'clientId', we'll just use a default. |
| if( clientId == null ) { |
| clientId = "defaultAjaxWebClient"; |
| } |
| String sessionKey = session.getId() + '-' + clientId; |
| |
| AjaxWebClient client = null; |
| synchronized (ajaxWebClients) { |
| client = ajaxWebClients.get( sessionKey ); |
| // create a new AjaxWebClient if one does not already exist for this sessionKey. |
| if( client == null ) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug( "creating new AjaxWebClient in "+sessionKey ); |
| } |
| client = new AjaxWebClient( request, maximumReadTimeout ); |
| ajaxWebClients.put( sessionKey, client ); |
| } |
| client.updateLastAccessed(); |
| } |
| return client; |
| } |
| |
| /** |
| * @return the timeout value for read requests which is always >= 0 and <= |
| * maximumReadTimeout to avoid DoS attacks |
| */ |
| protected long getReadTimeout(HttpServletRequest request) { |
| long answer = defaultReadTimeout; |
| |
| String name = request.getParameter(readTimeoutParameter); |
| if (name != null) { |
| answer = asLong(name); |
| } |
| if (answer < 0 || answer > maximumReadTimeout) { |
| answer = maximumReadTimeout; |
| } |
| return answer; |
| } |
| |
| /* |
| * an instance of this class runs every minute (started in init), to clean up old web clients & free resources. |
| */ |
| private class ClientCleaner extends TimerTask { |
| @Override |
| public void run() { |
| if( LOG.isDebugEnabled() ) { |
| LOG.debug( "Cleaning up expired web clients." ); |
| } |
| |
| synchronized( ajaxWebClients ) { |
| Iterator<Map.Entry<String, AjaxWebClient>> it = ajaxWebClients.entrySet().iterator(); |
| while ( it.hasNext() ) { |
| Map.Entry<String,AjaxWebClient> e = it.next(); |
| String key = e.getKey(); |
| AjaxWebClient val = e.getValue(); |
| if ( LOG.isDebugEnabled() ) { |
| LOG.debug( "AjaxWebClient " + key + " last accessed " + val.getMillisSinceLastAccessed()/1000 + " seconds ago." ); |
| } |
| // close an expired client and remove it from the ajaxWebClients hash. |
| if( val.closeIfExpired() ) { |
| if ( LOG.isDebugEnabled() ) { |
| LOG.debug( "Removing expired AjaxWebClient " + key ); |
| } |
| it.remove(); |
| } |
| } |
| } |
| } |
| } |
| |
| @Override |
| public void destroy() { |
| // make sure we cancel the timer |
| clientCleanupTimer.cancel(); |
| super.destroy(); |
| } |
| } |