blob: 847eea42ab5b69afcf7e35e6c59c39003cac7729 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.vysper.xmpp.extension.xep0124;
import org.apache.vysper.xml.fragment.XMLElement;
import org.apache.vysper.xmpp.modules.ServerRuntimeContextService;
import org.apache.vysper.xmpp.protocol.NamespaceURIs;
import org.apache.vysper.xmpp.protocol.StanzaProcessor;
import org.apache.vysper.xmpp.server.ServerRuntimeContext;
import org.apache.vysper.xmpp.server.SessionState;
import org.apache.vysper.xmpp.server.response.ServerResponses;
import org.apache.vysper.xmpp.stanza.Stanza;
import org.apache.vysper.xmpp.stanza.StanzaBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.servlet.AsyncContext;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.HttpSession;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* Processes the BOSH requests from the clients
* <p>
* This class is thread safe, it handles concurrent BOSH requests safely.
*
* @author The Apache MINA Project (dev@mina.apache.org)
*/
public class BoshHandler implements ServerRuntimeContextService {
private static final Logger LOGGER = LoggerFactory.getLogger(BoshHandler.class);
private ServerRuntimeContext serverRuntimeContext;
private StanzaProcessor stanzaProcessor;
private Map<String, BoshBackedSessionContext> sessions;
private InactivityChecker inactivityChecker;
public BoshHandler() {
// The sessions are stored in a ConcurrentHashMap to maintain the "happens before relationship" memory consistency.
// Although the operations on specific sessions are synchronized, the session creation and retrieval need the memory
// consistency guarantee too.
sessions = new ConcurrentHashMap<String, BoshBackedSessionContext>();
inactivityChecker = new InactivityChecker(this);
inactivityChecker.start();
}
/**
* Getting for the {@link ServerRuntimeContext}
* @return The current {@link ServerRuntimeContext}
*/
public ServerRuntimeContext getServerRuntimeContext() {
return serverRuntimeContext;
}
protected InactivityChecker getInactivityChecker() {
return inactivityChecker;
}
/**
* Setter for the {@link ServerRuntimeContext}
* @param serverRuntimeContext
*/
public void setServerRuntimeContext(ServerRuntimeContext serverRuntimeContext) {
this.serverRuntimeContext = serverRuntimeContext;
}
public void setStanzaProcessor(StanzaProcessor stanzaProcessor){
this.stanzaProcessor = stanzaProcessor;
}
protected BoshBackedSessionContext createSessionContext() {
return new BoshBackedSessionContext(serverRuntimeContext, stanzaProcessor, this, inactivityChecker);
}
/**
* Processes incoming BOSH requests
* @param httpRequest the HTTP request
* @param body the decoded BOSH request
*/
public void process(HttpServletRequest httpRequest, Stanza body) {
if (!body.getNamespaceURI().equalsIgnoreCase(NamespaceURIs.XEP0124_BOSH)) {
LOGGER.error("Invalid namespace for body wrapper '{}', must be '{}'!", body.getNamespaceURI(),
NamespaceURIs.XEP0124_BOSH);
return;
}
if (!body.getName().equalsIgnoreCase("body")) {
LOGGER.error("Invalid body wrapper '{}'!", body.getName());
return;
}
if (body.getAttribute("rid") == null) {
LOGGER.error("Invalid request that does not have a request identifier (rid) attribute!");
return;
}
final long rid;
try {
rid = Long.parseLong(body.getAttributeValue("rid"));
} catch (NumberFormatException e) {
LOGGER.error("not a valid RID: " + body.getAttributeValue("rid"));
// TODO handle properly by returning an error response
throw new RuntimeException(e);
}
if (rid <= 0L) {
LOGGER.warn("rid is not positive: " + rid);
// TODO handle properly by returning an error response
throw new RuntimeException("BOSH rid must be a positive, large number, not " + rid);
}
if (rid > 9007199254740991L) {
LOGGER.warn("rid too large: " + rid);
// continue anyway, this is not a problem with this implementation
}
BoshRequest newBoshRequest = new BoshRequest(httpRequest, body, rid);
LOGGER.debug("SID = " + body.getAttributeValue("sid") + " - new BoshRequest created for RID = " + rid);
// session creation request (first request). does not have a "sid" attribute
if (body.getAttribute("sid") == null) {
try {
createSession(newBoshRequest);
} catch (IOException e) {
LOGGER.error("Exception thrown while processing the session creation request: " + e.getMessage());
try {
final AsyncContext context = newBoshRequest.getHttpServletRequest().startAsync();
final ServletResponse response = context.getResponse();
if (response instanceof HttpServletResponse) {
HttpServletResponse httpServletResponse = (HttpServletResponse)response;
try {
httpServletResponse.sendError(400, "bad-request");
return;
} catch (IOException ioe) {
ioe.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
} else {
// create temporary session to be able to reuse the code
createSessionContext().sendError("bad-request");
}
} catch (Exception exe) {
LOGGER.warn("exception in async processing", exe);
}
}
return;
}
// in-session request, now find the server-side session
final String sid = body.getAttributeValue("sid");
BoshBackedSessionContext session = null;
if (sid != null) session = sessions.get(sid);
if (session == null) {
LOGGER.warn("Received an invalid sid = '{}', terminating connection", sid);
try {
final AsyncContext context = newBoshRequest.getHttpServletRequest().startAsync();
// create temporary session to be able to reuse the code
createSessionContext().sendError("invalid session id");
} catch (Exception exe) {
LOGGER.warn("exception in async processing", exe);
}
return;
}
// process request for the session
synchronized (session) {
session.insertRequest(newBoshRequest);
processIncomingClientStanzas(session, newBoshRequest.getBody());
}
}
protected void processIncomingClientStanzas(BoshBackedSessionContext session, Stanza stanza) {
if (session.getState() == SessionState.ENCRYPTED) {
if (stanza.getInnerElements().isEmpty()) {
// session needs authentication
return;
}
for (XMLElement element : stanza.getInnerElements()) {
if (element.getNamespaceURI().equals(NamespaceURIs.URN_IETF_PARAMS_XML_NS_XMPP_SASL)) {
processStanza(session, element);
}
}
} else if (session.getState() == SessionState.AUTHENTICATED) {
if ("true".equals(stanza.getAttributeValue(NamespaceURIs.URN_XMPP_XBOSH, "restart"))) {
// restart request
session.writeBoshResponse(BoshStanzaUtils.RESTART_BOSH_RESPONSE);
} else {
// any other request
for (XMLElement element : stanza.getInnerElements()) {
processStanza(session, element);
}
// if the client solicited the session termination
if ("terminate".equals(stanza.getAttributeValue("type"))) {
terminateSession(session);
}
}
} else {
LOGGER.debug("processing session while in state = " + session.getState());
}
}
protected void terminateSession(BoshBackedSessionContext session) {
removeSession(session.getSessionId());
session.writeBoshResponse(BoshStanzaUtils.TERMINATE_BOSH_RESPONSE);
session.close();
}
protected void processStanza(BoshBackedSessionContext session, XMLElement element) {
Stanza stanza;
if (element instanceof Stanza) {
stanza = (Stanza) element;
} else {
stanza = new Stanza(element.getNamespaceURI(), element.getName(), element.getNamespacePrefix(),
element.getAttributes(), element.getInnerFragments());
}
stanzaProcessor.processStanza(serverRuntimeContext, session, stanza,
session.getStateHolder());
}
protected void createSession(BoshRequest br) throws IOException {
final Stanza stanza = br.getBody();
if (stanza.getInnerElements().size() > 0) {
// we can see this behavior when the server-side session is aborted, but
// the client keeps sending stanzas
LOGGER.error("session creation request has content.");
throw new IOException("session creation request has content.");
}
BoshBackedSessionContext session = createSessionContext();
if (session.propagateSessionContext() && br.getHttpServletRequest() != null) {
final HttpSession httpSession = br.getHttpServletRequest().getSession(true);
httpSession.setAttribute(BoshBackedSessionContext.HTTP_SESSION_ATTRIBUTE, session);
}
final String contentAttribute = stanza.getAttributeValue("content");
if (contentAttribute != null) session.setContentType(contentAttribute);
String waitAttribute = stanza.getAttributeValue("wait");
if (waitAttribute != null) {
try {
final int wait = Integer.parseInt(waitAttribute);
session.setWait(wait);
} catch (NumberFormatException e) {
LOGGER.warn("wait value is expected to be an Integer, not {}", waitAttribute);
}
}
final String holdAttribute = stanza.getAttributeValue("hold");
if (holdAttribute != null) {
try {
int hold = Integer.parseInt(holdAttribute);
session.setHold(hold);
} catch (NumberFormatException e) {
LOGGER.warn("hold value is expected to be an Integer, not {}", holdAttribute);
}
}
final String versionAttribute = stanza.getAttributeValue("ver");
if (versionAttribute != null) {
try {
session.setBoshVersion(versionAttribute);
} catch (NumberFormatException e) {
LOGGER.warn("bosh version is expected to be of form nn.mm, not {}", versionAttribute);
}
}
final String langAttribute = stanza.getAttributeValue(NamespaceURIs.XML, "lang");
if (langAttribute != null) session.setXMLLang(langAttribute);
final String ackAttribute = stanza.getAttributeValue("ack");
final boolean clientAcknowledgements = "1".equals(ackAttribute);
session.setClientAcknowledgements(clientAcknowledgements);
session.insertRequest(br);
sessions.put(session.getSessionId(), session);
if (LOGGER.isInfoEnabled()) {
StringBuilder logMsg = new StringBuilder();
logMsg.append("BOSH session created with session id = ").append(session.getSessionId());
logMsg.append(", ver = ").append(session.getBoshVersion()).append(" (").append(versionAttribute).append(")");
logMsg.append(", hold = ").append(session.getHold()).append(" (").append(holdAttribute).append(")");
logMsg.append(", wait = ").append(session.getWait()).append(" (").append(waitAttribute).append(")");
logMsg.append(", ack = ").append(session.isClientAcknowledgements()).append(" (").append(ackAttribute).append(")");
LOGGER.info(logMsg.toString());
}
session.writeBoshResponse(getSessionCreationResponse(session));
}
public boolean removeSession(String sessionId) {
return sessions.remove(sessionId) != null;
}
protected Stanza getSessionCreationResponse(BoshBackedSessionContext session) {
StanzaBuilder body = BoshStanzaUtils.createBoshStanzaBuilder();
body.addAttribute("wait", Integer.toString(session.getWait()));
body.addAttribute("inactivity", Integer.toString(session.getInactivity()));
body.addAttribute("polling", Integer.toString(session.getPolling()));
body.addAttribute("requests", Integer.toString(session.getRequests()));
body.addAttribute("hold", Integer.toString(session.getHold()));
body.addAttribute("sid", session.getSessionId());
body.addAttribute("ver", session.getBoshVersion());
body.addAttribute("from", session.getServerJID().getFullQualifiedName());
body.addAttribute("secure", "true");
body.addAttribute("maxpause", Integer.toString(session.getMaxPause()));
// adding the ack attribute here is needed because when responding to o request with the same RID (as is the case here)
// the ack would not be included on BoshBackedSessionContext#write0, but this first ack is required.
body.addAttribute("ack", Long.toString(session.requestsWindow.getHighestContinuousRid()));
Stanza features = new ServerResponses().getFeaturesForAuthentication(serverRuntimeContext.getServerFeatures()
.getAuthenticationMethods(), session);
body.addPreparedElement(features);
return body.build();
}
public int getActiveSessionsCount() {
return sessions.size();
}
public String getServiceName() {
return this.getClass().getName();
}
}