blob: f281d96af2ddfa444393041453bc4783a54ce95b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.vysper.xmpp.extension.xep0124;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;
import java.util.SortedMap;
import java.util.TreeMap;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import org.apache.commons.lang.StringUtils;
import org.apache.vysper.xml.fragment.Renderer;
import org.apache.vysper.xml.fragment.XMLElement;
import org.apache.vysper.xmpp.protocol.SessionStateHolder;
import org.apache.vysper.xmpp.protocol.StanzaProcessor;
import org.apache.vysper.xmpp.server.AbstractSessionContext;
import org.apache.vysper.xmpp.server.ServerRuntimeContext;
import org.apache.vysper.xmpp.server.SessionState;
import org.apache.vysper.xmpp.stanza.Stanza;
import org.apache.vysper.xmpp.stanza.StanzaBuilder;
import org.apache.vysper.xmpp.writer.StanzaWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Keeps the session state of a BOSH client
*
* @author The Apache MINA Project (dev@mina.apache.org)
*/
public class BoshBackedSessionContext extends AbstractSessionContext implements StanzaWriter {
private final static Logger LOGGER = LoggerFactory.getLogger(BoshBackedSessionContext.class);
public final static String HTTP_SESSION_ATTRIBUTE = "org.apache.vysper.xmpp.extension.xep0124.BoshBackedSessionContext";
public final static String BOSH_REQUEST_ATTRIBUTE = "boshRequest";
public final static String BOSH_RESPONSE_ATTRIBUTE = "boshResponse";
protected static final Charset UTF8_CHARSET = Charset.forName("UTF-8");
private final int maxpauseSeconds = 120;
private final int inactivitySeconds = 60;
private final int pollingSeconds = 15;
private final int maximumSentResponses = 100;
/*
* The number of milliseconds that will have to pass for a response to be reported missing to the client by
* responding with a report and time attributes. See Response Acknowledgements in XEP-0124.
*/
private final int brokenConnectionReportTimeoutMillis = 1000;
/*
* Keeps the suspended HTTP requests (does not respond to them) until the server has an asynchronous message
* to send to the client. (Comet HTTP Long Polling technique - described in XEP-0124)
*
* The BOSH requests are sorted by their RIDs.
*/
protected final RequestsWindow requestsWindow;
/*
* Keeps the asynchronous messages sent from server that cannot be delivered to the client because there are
* no available HTTP requests to respond to (requestsWindow is empty).
*/
private final Queue<Stanza> delayedResponseQueue = new LinkedList<Stanza>();
/*
* A cache of sent responses to the BOSH client, kept in the event of delivery failure and retransmission requests.
* these sent responses are moved to the sentResponsesBacklog when the client acks their receival.
* See Broken Connections in XEP-0124.
*/
private final SortedMap<Long, BoshResponse> sentResponses = new TreeMap<Long, BoshResponse>();
/**
* backlog of sent responses which have been acked by the client (and thus shouldn't ever been needed to be resent.
*/
private final ResponsesBuffer sentResponsesBacklog = new ResponsesBuffer();
private int parallelRequestsCount = 2;
private String boshVersion = "1.9";
private String contentType = BoshServlet.XML_CONTENT_TYPE;
private int wait = 60;
private int hold = 1;
private int currentInactivitySeconds = inactivitySeconds;
/**
* must be synchronized along with requestsWindow
*/
private Long latestEmptyPollingRequestTimestamp = 0L;
/*
* Indicate if the BOSH client will use acknowledgements throughout the session and that the absence of an 'ack'
* attribute in any request is meaningful.
*/
private boolean clientAcknowledgements;
/*
* The timestamp of the latest response wrote to the client is used to measure the inactivity period.
* When reaching the maximum inactivity the session will automatically close.
*/
private long latestWriteTimestamp = System.currentTimeMillis();
private final InactivityChecker inactivityChecker;
private Long lastInactivityExpireTime;
private boolean isWatchedByInactivityChecker;
private boolean propagateSessionContextToHTTPSession = false;
/**
* Creates a new context for a session
* @param serverRuntimeContext
* @param stanzaProcessor
* @param inactivityChecker
*/
public BoshBackedSessionContext(ServerRuntimeContext serverRuntimeContext, StanzaProcessor stanzaProcessor, BoshHandler boshHandler, InactivityChecker inactivityChecker) {
super(serverRuntimeContext, stanzaProcessor, new SessionStateHolder());
// in BOSH we jump directly to the encrypted state
sessionStateHolder.setState(SessionState.ENCRYPTED);
requestsWindow = new RequestsWindow(getSessionId());
this.inactivityChecker = inactivityChecker;
updateInactivityChecker();
}
public boolean isWatchedByInactivityChecker() {
return isWatchedByInactivityChecker;
}
private synchronized void updateInactivityChecker() {
Long newInactivityExpireTime = null;
if (requestsWindow.isEmpty()) {
newInactivityExpireTime = latestWriteTimestamp + currentInactivitySeconds * 1000;
if (newInactivityExpireTime.equals(lastInactivityExpireTime)) {
return;
}
} else if (!isWatchedByInactivityChecker) {
return;
}
isWatchedByInactivityChecker = inactivityChecker.updateExpireTime(this, lastInactivityExpireTime, newInactivityExpireTime);
lastInactivityExpireTime = newInactivityExpireTime;
}
public SessionStateHolder getStateHolder() {
return sessionStateHolder;
}
public StanzaWriter getResponseWriter() {
return this;
}
public void setIsReopeningXMLStream() {
// BOSH does not use XML streams, the BOSH equivalent for reopening an XML stream is to restart the BOSH connection,
// and this is done in BoshHandler when the client requests it
}
/**
* true, iff this session context will be stored to the related BOSH HTTP session.
* @return
*/
public boolean propagateSessionContext() {
return propagateSessionContextToHTTPSession;
}
/*
* This method is synchronized on the session object to prevent concurrent writes to the same BOSH client
*/
synchronized public void write(Stanza stanza) {
if (stanza == null) throw new IllegalArgumentException("stanza must not be null.");
LOGGER.debug("SID = " + getSessionId() + " - adding server stanza for writing to BOSH client");
writeBoshResponse(BoshStanzaUtils.wrapStanza(stanza));
}
/**
* Writes a server-to-client XMPP stanza as a BOSH response (wrapped in a &lt;body/&gt; element) if there are
* available HTTP requests to respond to, otherwise the response is queued to be sent later
* (when a HTTP request becomes available).
* <p>
* (package access)
*
* @param responseStanza The BOSH response to write
*/
/*package*/ void writeBoshResponse(Stanza responseStanza) {
if (responseStanza == null) throw new IllegalArgumentException();
final boolean isEmtpyResponse = responseStanza == BoshStanzaUtils.EMPTY_BOSH_RESPONSE;
final ArrayList<BoshRequest> boshRequestsForRID = new ArrayList<BoshRequest>(1);
BoshResponse boshResponse;
final Long rid;
synchronized (requestsWindow) {
BoshRequest req = requestsWindow.pollNext();
if (req == null) {
if (isEmtpyResponse) return; // do not delay empty responses, everything's good.
// delay sending until request comes available
final boolean accepted = delayedResponseQueue.offer(responseStanza);
if (!accepted) {
LOGGER.debug("SID = " + getSessionId() + " - stanza not queued. BOSH delayedResponseQueue is full: {}",
delayedResponseQueue.size());
// TODO do not silently drop this stanza
}
return;
}
rid = req.getRid();
// in rare cases, we have same RID in two separate requests
boshRequestsForRID.add(req);
// collect more requests for this RID
while (rid.equals(requestsWindow.firstRid())) {
final BoshRequest sameRidRequest = requestsWindow.pollNext();
boshRequestsForRID.add(sameRidRequest);
LOGGER.warn("SID = " + getSessionId() + " - rid = {} - multi requests ({}) per RID.", rid, boshRequestsForRID.size());
}
long highestContinuousRid = requestsWindow.getHighestContinuousRid();
final Long ack = rid.equals(highestContinuousRid) ? null : highestContinuousRid;
boshResponse = getBoshResponse(responseStanza, ack);
if (LOGGER.isDebugEnabled()) {
String emptyHint = isEmtpyResponse ? "empty " : StringUtils.EMPTY;
LOGGER.debug("SID = " + getSessionId() + " - rid = " + rid + " - BOSH writing {}response: {}", emptyHint, new String(boshResponse.getContent()));
}
}
synchronized (sentResponses) {
if (isResponseSavable(boshRequestsForRID.get(0), responseStanza)) {
sentResponses.put(rid, boshResponse);
// The number of responses to non-pause requests kept in the buffer SHOULD be either the same as the maximum
// number of simultaneous requests allowed or, if Acknowledgements are being used, the number of responses
// that have not yet been acknowledged (this part is handled in insertRequest(BoshRequest)), or
// the hard limit maximumSentResponses (not in the specification) that prevents excessive memory consumption.
if (sentResponses.size() > maximumSentResponses || (!isClientAcknowledgements() && sentResponses.size() > parallelRequestsCount)) {
final Long key = sentResponses.firstKey();
sentResponsesBacklog.add(key, sentResponses.remove(key));
}
}
}
if (sentResponses.size() > maximumSentResponses + 10) {
synchronized (sentResponses) {
LOGGER.warn("stored sent responses ({}) exeeds maximum ({}). purging.", sentResponses.size(), maximumSentResponses);
while (sentResponses.size() > maximumSentResponses) {
final Long key = sentResponses.firstKey();
sentResponsesBacklog.add(key, sentResponses.remove(key));
}
}
}
for (BoshRequest boshRequest : boshRequestsForRID) {
try {
final AsyncContext asyncContext = saveResponse(boshRequest, boshResponse);
asyncContext.dispatch();
} catch (Exception e) {
LOGGER.warn("SID = " + getSessionId() + " - exception in async processing rid = {}", boshRequest.getRid(), e);
}
}
setLatestWriteTimestamp();
updateInactivityChecker();
}
private String logRIDSequence() {
final String logMsg = requestsWindow.logRequestWindow();
return logMsg + ", sent buffer = " + sentResponses.size();
}
private void setLatestWriteTimestamp() {
latestWriteTimestamp = System.currentTimeMillis();
}
private static boolean isResponseSavable(BoshRequest req, Stanza response) {
// responses to pause requests are not saved
if (req.getBody().getAttributeValue("pause") != null) {
return false;
}
// responses with binding error are not saved
for (XMLElement element : response.getInnerElements()) {
if ("iq".equals(element.getName()) && "error".equals(element.getAttributeValue("type"))) {
for (XMLElement subelement : element.getInnerElements()) {
if ("bind".equals(subelement.getName())) {
return false;
}
}
}
}
return true;
}
public void sendError(String condition) {
sendError(null, condition);
}
/**
* Writes an error to the client and closes the connection
* @param condition the error condition
*/
protected void sendError(BoshRequest req, String condition) {
req = req == null ? requestsWindow.pollNext() : req;
if (req == null) {
LOGGER.warn("SID = " + getSessionId() + " - no request for sending BOSH error " + condition);
endSession(SessionTerminationCause.CONNECTION_ABORT);
return;
}
final Long rid = req.getRid();
Stanza body = BoshStanzaUtils.createTerminateResponse(condition).build();
BoshResponse boshResponse = getBoshResponse(body, null);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("SID = " + getSessionId() + " - rid = {} - BOSH writing response: {}", rid, new String(boshResponse.getContent()));
}
try {
final AsyncContext asyncContext = saveResponse(req, boshResponse);
asyncContext.dispatch();
} catch (Exception e) {
LOGGER.warn("SID = " + getSessionId() + " - exception in async processing", e);
}
close();
}
/*
* Terminates the BOSH session
*/
public void close() {
// respond to all the queued HTTP requests with termination responses
synchronized (requestsWindow) {
BoshRequest next;
while ((next = requestsWindow.pollNext()) != null) {
Stanza body = BoshStanzaUtils.TERMINATE_BOSH_RESPONSE;
BoshResponse boshResponse = getBoshResponse(body, null);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("SID = " + getSessionId() + " - rid = {} - BOSH writing response: {}", next.getRid(), new String(boshResponse.getContent()));
}
try {
final AsyncContext asyncContext = saveResponse(next, boshResponse);
asyncContext.dispatch();
} catch (Exception e) {
LOGGER.warn("SID = " + getSessionId() + " - exception in async processing", e);
}
}
inactivityChecker.updateExpireTime(this, lastInactivityExpireTime, null);
lastInactivityExpireTime = null;
}
LOGGER.info("SID = " + getSessionId() + " - session closed");
}
public void switchToTLS(boolean delayed, boolean clientTls) {
// BOSH cannot switch dynamically (because STARTTLS cannot be used with HTTP),
// SSL can be enabled/disabled in BoshEndpoint#setSSLEnabled()
}
/**
* Setter for the Content-Type header of the HTTP responses sent to the BOSH client associated with this session
* @param contentType
*/
public void setContentType(String contentType) {
this.contentType = contentType;
}
/**
* Getter for the Content-Type header
* @return the configured Content-Type
*/
public String getContentType() {
return contentType;
}
/**
* Getter for the maximum length of a temporary session pause (in seconds) that a client can request
* @return
*/
public int getMaxPause() {
return maxpauseSeconds;
}
/**
* Setter for the BOSH 'wait' parameter, the longest time (in seconds) that the connection manager is allowed to
* wait before responding to any request during the session. The BOSH client can only configure this parameter to
* a lower value than the default value from this session context.
* @param wait the BOSH 'wait' parameter
*/
public void setWait(int wait) {
this.wait = Math.min(wait, this.wait);
}
/**
* Getter for the BOSH 'wait' parameter
* @return The BOSH 'wait' parameter
*/
public int getWait() {
return wait;
}
/**
* Setter for the BOSH 'hold' parameter, the maximum number of HTTP requests the connection manager is allowed to
* keep waiting at any one time during the session. The value of this parameter can trigger the modification of
* the BOSH 'requests' parameter.
* @param hold
*/
public void setHold(int hold) {
this.hold = hold;
if (hold >= 2) {
parallelRequestsCount = hold + 1;
}
}
/**
* Getter for the BOSH 'hold' parameter
* @return the BOSH 'hold' parameter
*/
public int getHold() {
return hold;
}
/**
* Setter for the client acknowledgements throughout the session
* @param value true is enabled, false otherwise
*/
public void setClientAcknowledgements(boolean value) {
clientAcknowledgements = value;
}
/**
* Getter for client acknowledgements
* @return true if enabled, false otherwise
*/
public boolean isClientAcknowledgements() {
return clientAcknowledgements;
}
/**
* Setter for the highest version of the BOSH protocol that the connection manager supports, or the version
* specified by the client in its request, whichever is lower.
* @param version the BOSH version
*/
public void setBoshVersion(String version) throws NumberFormatException {
try {
String[] v = boshVersion.split("\\.");
int major = Integer.parseInt(v[0]);
int minor = Integer.parseInt(v[1]);
v = version.split("\\.");
if (v.length == 2) {
int clientMajor = Integer.parseInt(v[0]);
int clientMinor = Integer.parseInt(v[1]);
if (clientMajor < major || (clientMajor == major && clientMinor < minor)) {
boshVersion = version;
}
}
} catch (NumberFormatException e) {
throw e;
}
}
/**
* Getter for the BOSH protocol version
* @return the BOSH version
*/
public String getBoshVersion() {
return boshVersion;
}
/**
* Getter for the BOSH 'inactivity' parameter, the longest allowable inactivity period (in seconds).
* @return the BOSH 'inactivity' parameter
*/
public int getInactivity() {
return inactivitySeconds;
}
/**
* Getter for the BOSH 'polling' parameter, the shortest allowable polling interval (in seconds).
* @return the BOSH 'polling' parameter
*/
public int getPolling() {
return pollingSeconds;
}
/**
* Getter for the BOSH 'requests' parameter, the limit number of simultaneous requests the client makes.
* @return the BOSH 'requests' parameter
*/
public int getRequests() {
return parallelRequestsCount;
}
/*
* A request expires when it stays enqueued in the requestsWindow longer than the allowed 'wait' time.
* The synchronization on the session object ensures that there will be no concurrent writes or other concurrent
* expirations for the BOSH client while the current request expires.
*/
private void requestExpired(final AsyncContext context) {
final BoshRequest req =
(BoshRequest) context.getRequest().getAttribute(BOSH_REQUEST_ATTRIBUTE);
if (req == null) {
LOGGER.warn("SID = " + getSessionId() + " - Continuation expired without having "
+ "an associated request!");
return;
}
LOGGER.debug("SID = " + getSessionId() + " - rid = {} - BOSH request expired", req.getRid());
while (!requestsWindow.isEmpty() && requestsWindow.firstRid() <= req.getRid()) {
writeBoshResponse(BoshStanzaUtils.EMPTY_BOSH_RESPONSE);
}
}
/**
* Suspends and enqueues an HTTP request to be used later when an asynchronous message needs to be sent from
* the connection manager to the BOSH client.
*
* @param req the HTTP request
*/
public void insertRequest(final BoshRequest br) {
final Stanza boshOuterBody = br.getBody();
final Long rid = br.getRid();
LOGGER.debug("SID = " + getSessionId() + " - rid = {} - inserting new BOSH request", rid);
// reset the inactivity
currentInactivitySeconds = inactivitySeconds;
final HttpServletRequest request = br.getHttpServletRequest();
request.setAttribute(BOSH_REQUEST_ATTRIBUTE, br);
final AsyncContext context = request.startAsync();
addContinuationExpirationListener(context);
context.setTimeout(this.wait * 1000);
// allow two more parallel request, be generous in what you receive
final int maxToleratedParallelRequests = parallelRequestsCount + 2;
synchronized (requestsWindow) {
// only allow 'parallelRequestsCount' request to be queued
final long highestContinuousRid = requestsWindow.getHighestContinuousRid();
if (highestContinuousRid != -1 && rid > highestContinuousRid + maxToleratedParallelRequests) {
LOGGER.warn("SID = " + getSessionId() + " - rid = {} - received RID >= the permitted window of concurrent requests ({})",
rid, highestContinuousRid);
// don't queue // queueRequest(br);
sendError(br, "item-not-found");
return;
}
// resend missed responses
final boolean resend = rid <= requestsWindow.getCurrentProcessingRequest();
if (resend) {
// OLD: if (highestContinuousRid != null && rid <= highestContinuousRid) {
synchronized (sentResponses) {
if (LOGGER.isInfoEnabled()) {
final String pendingRids = requestsWindow.logRequestWindow();
final String sentRids = logSentResponsesBuffer();
LOGGER.info("SID = " + getSessionId() + " - rid = {} - resend request. sent buffer: {} - req.win.: " + pendingRids, rid, sentRids);
}
if (sentResponses.containsKey(rid)) {
LOGGER.info("SID = " + getSessionId() + " - rid = {} (re-sending)", rid);
// Resending the old response
resendResponse(br);
} else {
// not in sent responses, try alternatives: backlog and requestWindow
final BoshResponse response = sentResponsesBacklog.lookup(rid);
if (response != null) {
LOGGER.warn("SID = " + getSessionId() + " - rid = {} - BOSH response retrieved from sentResponsesBacklog", rid);
resendResponse(br, rid, response);
return; // no error
}
// rid not in sent responses, nor backlog. check to see if rid is still in requests window
boolean inRequestsWindow = requestsWindow.containsRid(rid);
if (!inRequestsWindow) {
if (LOGGER.isWarnEnabled()) {
final String sentRids = logSentResponsesBuffer();
LOGGER.warn("SID = " + getSessionId() + " - rid = {} - BOSH response not in buffer error - " + sentRids, rid);
}
} else {
if (LOGGER.isWarnEnabled()) {
final String sentRids = logSentResponsesBuffer();
LOGGER.warn("SID = " + getSessionId() + " - rid = {} - BOSH response still in requests window - " + sentRids, rid);
}
}
sendError(br, "item-not-found");
}
}
return;
}
// check for too many parallel requests
final boolean terminate = "terminate".equals(boshOuterBody.getAttributeValue("type"));
final boolean pause = boshOuterBody.getAttributeValue("pause") != null;
final boolean bodyIsEmpty = boshOuterBody.getInnerElements().isEmpty();
final int distinctRIDs = requestsWindow.getDistinctRIDs();
if (distinctRIDs >= maxToleratedParallelRequests && !terminate && !pause) {
LOGGER.warn("SID = " + getSessionId() + " - rid = {} - BOSH Overactivity: Too many simultaneous requests, max = {} " + logRIDSequence(), rid, maxToleratedParallelRequests);
sendError(br, "policy-violation");
return;
}
// check for new request comes early
if (distinctRIDs + 1 == maxToleratedParallelRequests && !terminate && !pause && bodyIsEmpty) {
final long millisSinceLastCalls = Math.abs(br.getTimestamp() - requestsWindow.getLatestAddionTimestamp());
if (millisSinceLastCalls < pollingSeconds * 1000 && !rid.equals(requestsWindow.getLatestRID())) {
LOGGER.warn("SID = " + getSessionId() + " - rid = {} - BOSH Overactivity: Too frequent requests, millis since requests = {}, " + logRIDSequence(), rid, millisSinceLastCalls);
sendError(br, "policy-violation");
return;
}
}
// check
if ((wait == 0 || hold == 0) && bodyIsEmpty) {
final long millisBetweenEmptyReqs = Math.abs(br.getTimestamp() - latestEmptyPollingRequestTimestamp);
if (millisBetweenEmptyReqs < pollingSeconds * 1000 && !rid.equals(requestsWindow.getLatestRID())) {
LOGGER.warn("SID = " + getSessionId() + " - rid = {} - BOSH Overactivity for polling: Too frequent requests, millis since requests = {}, " + logRIDSequence(), rid, millisBetweenEmptyReqs);
sendError(br, "policy-violation");
return;
}
latestEmptyPollingRequestTimestamp = br.getTimestamp();
}
queueRequest(br);
}
if (isClientAcknowledgements()) {
synchronized (sentResponses) {
if (boshOuterBody.getAttribute("ack") == null) {
// if there is no ack attribute present then the client confirmed it received all the responses to all the previous requests
// and we clear the cache
sentResponsesBacklog.addAll(sentResponses);
sentResponses.clear();
} else if (!sentResponses.isEmpty()) {
// After receiving a request with an 'ack' value less than the 'rid' of the last request that it has already responded to,
// the connection manager MAY inform the client of the situation. In this case it SHOULD include a 'report' attribute set
// to one greater than the 'ack' attribute it received from the client, and a 'time' attribute set to the number of milliseconds
// since it sent the response associated with the 'report' attribute.
long ack = Long.parseLong(boshOuterBody.getAttributeValue("ack"));
if (ack < sentResponses.lastKey() && sentResponses.containsKey(ack + 1)) {
long delta = System.currentTimeMillis() - sentResponses.get(ack + 1).getTimestamp();
if (delta >= brokenConnectionReportTimeoutMillis) {
sendBrokenConnectionReport(ack + 1, delta);
return;
}
}
}
}
}
// we cannot pause if there are missing requests, this is tested with
// br.getRid().equals(requestsWindow.lastKey()) && highestContinuousRid.equals(br.getRid())
synchronized (requestsWindow) {
final String pauseAttribute = boshOuterBody.getAttributeValue("pause");
if (pauseAttribute != null &&
rid.equals(requestsWindow.getLatestRID()) &&
rid.equals(requestsWindow.getHighestContinuousRid())) {
int pause;
try {
pause = Integer.parseInt(pauseAttribute);
} catch (NumberFormatException e) {
queueRequest(br);
sendError("bad-request");
return;
}
pause = Math.max(0, pause);
pause = Math.min(pause, maxpauseSeconds);
respondToPause(pause);
return;
}
}
// If there are delayed responses waiting to be sent to the BOSH client, then we wrap them all in
// a <body/> element and send them as a HTTP response to the current HTTP request.
Stanza delayedResponse;
ArrayList<Stanza> mergeCandidates = null; // do not create until there is a delayed response
while ((delayedResponse = delayedResponseQueue.poll()) != null) {
if (mergeCandidates == null) mergeCandidates = new ArrayList<Stanza>();
mergeCandidates.add(delayedResponse);
}
Stanza mergedResponse = BoshStanzaUtils.mergeResponses(mergeCandidates);
if (mergedResponse != null) {
LOGGER.debug("SID = " + getSessionId() + " - writing merged response. stanzas merged = " + mergeCandidates.size());
writeBoshResponse(mergedResponse);
return;
}
// If there are more suspended enqueued requests than it is allowed by the BOSH 'hold' parameter,
// than we release the oldest one by sending an empty response.
if (requestsWindow.size() > hold) {
writeBoshResponse(BoshStanzaUtils.EMPTY_BOSH_RESPONSE);
}
}
public String logSentResponsesBuffer() {
final StringBuffer logMsg = new StringBuffer("sent = [");
for (Iterator<Long> iterator = sentResponses.keySet().iterator(); iterator.hasNext(); ) {
Long sentRid = iterator.next();
logMsg.append(sentRid);
if (iterator.hasNext()) logMsg.append(", ");
}
logMsg.append("]");
return logMsg.toString();
}
protected void respondToPause(int pause) {
LOGGER.debug("SID = " + getSessionId() + " - Setting inactivity period to {}", pause);
currentInactivitySeconds = pause;
while (!requestsWindow.isEmpty()) {
writeBoshResponse(BoshStanzaUtils.EMPTY_BOSH_RESPONSE);
}
}
protected void sendBrokenConnectionReport(long report, long delta) {
StanzaBuilder stanzaBuilder = BoshStanzaUtils.createBrokenSessionReport(report, delta);
writeBoshResponse(stanzaBuilder.build());
}
protected void addContinuationExpirationListener(final AsyncContext context) {
// listen the continuation to be notified when the request expires
context.addListener(new AsyncListener() {
public void onTimeout(AsyncEvent event) throws IOException {
requestExpired(context);
}
public void onStartAsync(AsyncEvent event) throws IOException {
// ignore
}
public void onError(AsyncEvent event) throws IOException {
handleAsyncEventError(event);
}
public void onComplete(AsyncEvent event) throws IOException {
// ignore
}
});
}
protected void handleAsyncEventError(AsyncEvent event) {
final ServletRequest suppliedRequest = event.getSuppliedRequest();
final ServletResponse suppliedResponse = event.getSuppliedResponse();
BoshRequest boshRequest = (BoshRequest)suppliedRequest.getAttribute(BOSH_REQUEST_ATTRIBUTE);
BoshResponse boshResponse = (BoshResponse)suppliedRequest.getAttribute(BOSH_RESPONSE_ATTRIBUTE);
// works at least for jetty:
final Exception exceptionObject = (Exception)suppliedRequest.getAttribute("javax.servlet.error.exception");
final Throwable throwable = event.getThrowable() != null ? event.getThrowable() : exceptionObject;
final Long rid = boshRequest == null ? null : boshRequest.getRid();
LOGGER.warn("SID = " + getSessionId() + " - JID = " + getInitiatingEntity() + " - RID = " + rid + " - async error on event ", event.getClass(), throwable);
}
protected void resendResponse(BoshRequest br) {
final Long rid = br.getRid();
BoshResponse boshResponse = sentResponses.get(rid);
resendResponse(br, rid, boshResponse);
}
protected void resendResponse(BoshRequest br, Long rid, BoshResponse boshResponse) {
if (boshResponse == null) {
LOGGER.debug("SID = " + getSessionId() + " - rid = {} - BOSH response could not (no longer) be retrieved for resending.", rid);
return;
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("SID = " + getSessionId() + " - rid = {} - BOSH writing response (resending): {}", rid, new String(boshResponse.getContent()));
}
try {
final AsyncContext asyncContext = saveResponse(br, boshResponse);
asyncContext.dispatch();
} catch (Exception e) {
LOGGER.warn("SID = " + getSessionId() + " - exception in async processing", e);
}
setLatestWriteTimestamp();
updateInactivityChecker();
}
protected BoshResponse getBoshResponse(Stanza stanza, Long ack) {
if (ack != null) {
stanza = BoshStanzaUtils.addAttribute(stanza, "ack", ack.toString());
}
byte[] content = new Renderer(stanza).getComplete().getBytes(UTF8_CHARSET);
return new BoshResponse(contentType, content);
}
protected void queueRequest(BoshRequest br) {
requestsWindow.queueRequest(br);
updateInactivityChecker();
}
protected AsyncContext saveResponse(final BoshRequest boshRequest, final BoshResponse boshResponse) {
if (boshResponse == null) throw new IllegalArgumentException("boshResponse cannot be null");
final HttpServletRequest request = boshRequest.getHttpServletRequest();
final AsyncContext asyncContext = request.getAsyncContext();
request.setAttribute(BOSH_RESPONSE_ATTRIBUTE, boshResponse);
return asyncContext;
}
}