blob: e0f2fb7eb068b753af11721724c6224774679a5d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.waveprotocol.wave.federation.xmpp;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.MapMaker;
import com.google.inject.Inject;
import com.typesafe.config.Config;
import org.dom4j.Element;
import org.waveprotocol.wave.federation.FederationErrorProto.FederationError;
import org.waveprotocol.wave.federation.FederationErrors;
import org.xmpp.packet.IQ;
import org.xmpp.packet.Message;
import org.xmpp.packet.Packet;
import org.xmpp.packet.PacketError;
import java.util.concurrent.*;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Provides abstraction between Federation-specific code and the backing XMPP
* transport, including support for reliable outgoing calls (i.e. calls that are
* guaranteed to time out) and sending error responses.
*
* TODO(thorogood): Find a better name for this class. Suggestions include
* PacketHandler, Switchbox, TransportConnector, ReliableRouter, ...
*
* @author thorogood@google.com (Sam Thorogood)
*/
public class XmppManager implements IncomingPacketHandler {
private static final Logger LOG = Logger.getLogger(XmppManager.class.getCanonicalName());
/**
* Inner static class representing a single outgoing call.
*/
private static class OutgoingCall {
final Class<? extends Packet> responseType;
PacketCallback callback;
ScheduledFuture<?> timeout;
OutgoingCall(Class<? extends Packet> responseType, PacketCallback callback) {
this.responseType = responseType;
this.callback = callback;
}
void start(ScheduledFuture<?> timeout) {
Preconditions.checkState(this.timeout == null);
this.timeout = timeout;
}
}
/**
* Inner non-static class representing a single incoming call. These are not
* cancellable and do not time out; this is just a helper class so success and
* failure responses may be more cleanly invoked.
*/
private class IncomingCallback implements PacketCallback {
private final Packet request;
private boolean complete = false;
IncomingCallback(Packet request) {
this.request = request;
}
@Override
public void error(FederationError error) {
Preconditions.checkState(!complete,
"Must not callback multiple times for incoming packet: %s", request);
complete = true;
sendErrorResponse(request, error);
}
@Override
public void run(Packet response) {
Preconditions.checkState(!complete,
"Must not callback multiple times for incoming packet: %s", request);
// TODO(thorogood): Check outgoing response versus stored incoming request
// to ensure that to/from are paired correctly?
complete = true;
transport.sendPacket(response);
}
}
// Injected types that handle incoming XMPP packet types.
private final XmppFederationHost host;
private final XmppFederationRemote remote;
private final XmppDisco disco;
private final OutgoingPacketTransport transport;
private final String jid;
// Pending callbacks to outgoing requests.
private final ConcurrentMap<String, OutgoingCall> callbacks = new MapMaker().makeMap();
private final ScheduledExecutorService timeoutExecutor =
Executors.newSingleThreadScheduledExecutor();
@Inject
public XmppManager(XmppFederationHost host, XmppFederationRemote remote, XmppDisco disco,
OutgoingPacketTransport transport, Config config) {
this.host = host;
this.remote = remote;
this.disco = disco;
this.transport = transport;
this.jid = config.getString("federation.xmpp_jid");
// Configure all related objects with this manager. Eventually, this should
// be replaced by better Guice interface bindings.
host.setManager(this);
remote.setManager(this);
disco.setManager(this);
}
@Override
public void receivePacket(final Packet packet) {
if (LOG.isLoggable(Level.FINE)) {
LOG.fine("Received incoming XMPP packet:\n" + packet);
}
if (packet instanceof IQ) {
IQ iq = (IQ) packet;
if (iq.getType().equals(IQ.Type.result) || iq.getType().equals(IQ.Type.error)) {
// Result type, hand off to callback handler.
response(packet);
} else {
processIqGetSet(iq);
}
} else if (packet instanceof Message) {
Message message = (Message) packet;
if (message.getType().equals(Message.Type.error)
|| message.getChildElement("received", XmppNamespace.NAMESPACE_XMPP_RECEIPTS) != null) {
// Response type, hand off to callback handler.
response(packet);
} else {
processMessage(message);
}
} else {
sendErrorResponse(packet, FederationError.Code.BAD_REQUEST, "Unhandled packet type: "
+ packet.getElement().getQName().getName());
}
}
/**
* Populate the given request subclass of Packet and return it.
*/
private <V extends Packet> V createRequest(V packet, String toJid) {
packet.setTo(toJid);
packet.setID(XmppUtil.generateUniqueId());
packet.setFrom(jid);
return packet;
}
/**
* Create a request IQ stanza with the given toJid.
*
* @param toJid target JID
* @return new IQ stanza
*/
public IQ createRequestIQ(String toJid) {
return createRequest(new IQ(), toJid);
}
/**
* Create a request Message stanza with the given toJid.
*
* @param toJid target JID
* @return new Message stanza
*/
public Message createRequestMessage(String toJid) {
return createRequest(new Message(), toJid);
}
/**
* Sends the given XMPP packet over the backing transport. This accepts a
* callback which is guaranteed to be invoked at a later point, either through
* a normal response, error response, or timeout.
*
* @param packet packet to be sent
* @param callback callback to be invoked on response or timeout
* @param timeout timeout, in seconds, for this callback
*/
public void send(Packet packet, final PacketCallback callback, int timeout) {
final String key = packet.getID() + "#" + packet.getTo() + "#" + packet.getFrom();
final OutgoingCall call = new OutgoingCall(packet.getClass(), callback);
if (callbacks.putIfAbsent(key, call) == null) {
// Timeout runnable to be invoked on packet expiry.
Runnable timeoutTask = new Runnable() {
@Override
public void run() {
if (callbacks.remove(key, call)) {
callback.error(
FederationErrors.newFederationError(FederationError.Code.REMOTE_SERVER_TIMEOUT));
} else {
// Likely race condition where success has actually occurred. Ignore.
}
}
};
call.start(timeoutExecutor.schedule(timeoutTask, timeout, TimeUnit.SECONDS));
transport.sendPacket(packet);
} else {
String msg = "Could not send packet, ID already in-flight: " + key;
LOG.warning(msg);
// Invoke the callback with an internal error.
callback.error(
FederationErrors.newFederationError(FederationError.Code.UNDEFINED_CONDITION, msg));
}
}
/**
* Cause an immediate timeout for the given packet, which is presumed to have
* already been sent via {@link #send}.
*/
@VisibleForTesting
void causeImmediateTimeout(Packet packet) {
String key = packet.getID() + "#" + packet.getTo() + "#" + packet.getFrom();
OutgoingCall call = callbacks.remove(key);
if (call != null) {
call.callback.error(FederationErrors.newFederationError(
FederationError.Code.REMOTE_SERVER_TIMEOUT, "Forced immediate timeout"));
}
}
/**
* Invoke the callback for a packet already identified as a response. This may
* either invoke the error or normal callback as necessary.
*/
private void response(Packet packet) {
String key = packet.getID() + "#" + packet.getFrom() + "#" + packet.getTo();
OutgoingCall call = callbacks.remove(key);
if (call == null) {
LOG.warning("Received response packet without paired request: " + packet.getID());
} else {
// Cancel the outstanding timeout.
call.timeout.cancel(false);
// Look for error condition and invoke the relevant callback.
Element element = packet.getElement().element("error");
if (element != null) {
LOG.fine("Invoking error callback for: " + packet.getID());
call.callback.error(toFederationError(new PacketError(element)));
} else {
if (call.responseType.equals(packet.getClass())) {
LOG.fine("Invoking normal callback for: " + packet.getID());
call.callback.run(packet);
} else {
String msg =
"Received mismatched response packet type: expected " + call.responseType
+ ", given " + packet.getClass();
LOG.warning(msg);
call.callback.error(FederationErrors.newFederationError(
FederationError.Code.UNDEFINED_CONDITION, msg));
}
}
// Clear call's reference to callback, otherwise callback only
// becomes eligible for GC once the timeout expires, because
// timeoutExecutor holds on to the call object till then, even
// though we cancelled the timeout.
call.callback = null;
}
}
/**
* Process IQ request stanzas. This encompasses XMPP disco, submit and history
* requests/responses, and get/post signer info requests/responses.
*/
private void processIqGetSet(IQ iq) {
Element body = iq.getChildElement();
if (body == null) {
sendErrorResponse(iq, FederationErrors.badRequest("Malformed request, no IQ child"));
return;
}
final String namespace = body.getQName().getNamespace().getURI();
final boolean isIQSet;
if (iq.getType().equals(IQ.Type.get)) {
isIQSet = false;
} else if (iq.getType().equals(IQ.Type.set)) {
isIQSet = true;
} else {
throw new IllegalArgumentException("Can only process an IQ get/set.");
}
PacketCallback responseCallback = new IncomingCallback(iq);
if (namespace.equals(XmppNamespace.NAMESPACE_PUBSUB)) {
final Element pubsub = iq.getChildElement();
final Element element = pubsub.element(isIQSet ? "publish" : "items");
if (element.attributeValue("node").equals("wavelet")) {
if (isIQSet) {
host.processSubmitRequest(iq, responseCallback);
} else {
host.processHistoryRequest(iq, responseCallback);
}
} else if (element.attributeValue("node").equals("signer")) {
if (isIQSet) {
host.processPostSignerRequest(iq, responseCallback);
} else {
host.processGetSignerRequest(iq, responseCallback);
}
} else {
sendErrorResponse(iq, FederationError.Code.BAD_REQUEST, "Unhandled pubsub request");
}
} else if (!isIQSet) {
switch (namespace) {
case XmppNamespace.NAMESPACE_DISCO_INFO:
disco.processDiscoInfoGet(iq, responseCallback);
break;
case XmppNamespace.NAMESPACE_DISCO_ITEMS:
disco.processDiscoItemsGet(iq, responseCallback);
break;
default:
sendErrorResponse(iq, FederationError.Code.BAD_REQUEST, "Unhandled IQ get");
break;
}
} else {
sendErrorResponse(iq, FederationError.Code.BAD_REQUEST, "Unhandled IQ set");
}
}
/**
* Processes Message stanzas. This encompasses wavelet updates, update acks,
* and ping messages.
*/
private void processMessage(Message message) {
if (message.getChildElement("event", XmppNamespace.NAMESPACE_PUBSUB_EVENT) != null) {
remote.update(message, new IncomingCallback(message));
} else if (message.getChildElement("ping", XmppNamespace.NAMESPACE_WAVE_SERVER) != null) {
// Respond inline to the ping.
LOG.info("Responding to ping from: " + message.getFrom());
Message response = XmppUtil.createResponseMessage(message);
response.addChildElement("received", XmppNamespace.NAMESPACE_XMPP_RECEIPTS);
transport.sendPacket(response);
} else {
sendErrorResponse(message, FederationError.Code.BAD_REQUEST, "Unhandled message type");
}
}
/**
* Helper method to send generic error responses, backed onto
* {@link #sendErrorResponse(Packet, FederationError)}.
*/
void sendErrorResponse(Packet request, FederationError.Code code) {
sendErrorResponse(request, FederationErrors.newFederationError(code));
}
/**
* Helper method to send error responses, backed onto
* {@link #sendErrorResponse(Packet, FederationError)}.
*/
void sendErrorResponse(Packet request, FederationError.Code code, String text) {
sendErrorResponse(request, FederationErrors.newFederationError(code, text));
}
/**
* Send an error request to the passed incoming request.
*
* @param request packet request, target is derived from its to/from
* @param error error to be contained in response
*/
void sendErrorResponse(Packet request, FederationError error) {
if (error.getErrorCode() == FederationError.Code.OK) {
throw new IllegalArgumentException("Can't send an error of OK!");
}
sendErrorResponse(request, toPacketError(error));
}
/**
* Send an error response to the passed incoming request. Throws
* IllegalArgumentException if the original packet is also an error, or is of
* the IQ result type.
*
* According to RFC 3920 (9.3.1), the error packet may contain the original
* packet. However, this implementation does not include it.
*
* @param request packet request, to/from is inverted for response
* @param error packet error describing error condition
*/
void sendErrorResponse(Packet request, PacketError error) {
if (request instanceof IQ) {
IQ.Type type = ((IQ) request).getType();
if (!(type.equals(IQ.Type.get) || type.equals(IQ.Type.set))) {
throw new IllegalArgumentException("May only return an error to IQ get/set, not: " + type);
}
} else if (request instanceof Message) {
Message message = (Message) request;
if (message.getType().equals(Message.Type.error)) {
throw new IllegalArgumentException("Can't return an error to another message error");
}
} else {
throw new IllegalArgumentException("Unexpected Packet subclass, expected Message/IQ: "
+ request.getClass());
}
LOG.fine("Sending error condition in response to " + request.getID() + ": "
+ error.getCondition().name());
// Note that this does not include the original packet; just the ID.
final Packet response = XmppUtil.createResponsePacket(request);
response.setError(error);
transport.sendPacket(response);
}
/**
* Convert a FederationError instance to a PacketError. This may return
* <undefined-condition> if the incoming error can't be understood.
*
* @param error the incoming error
* @return a generated PacketError instance
* @throws IllegalArgumentException if the OK error code is given
*/
private static PacketError toPacketError(FederationError error) {
Preconditions.checkArgument(error.getErrorCode() != FederationError.Code.OK);
String tag = error.getErrorCode().name().toLowerCase().replace('_', '-');
PacketError.Condition condition;
try {
condition = PacketError.Condition.fromXMPP(tag);
} catch (IllegalArgumentException e) {
condition = PacketError.Condition.undefined_condition;
LOG.warning("Did not understand error condition, defaulting to: " + condition.name());
}
PacketError result = new PacketError(condition);
if (error.hasErrorMessage()) {
// TODO(thorogood): Hide this behind a flag so we don't always broadcast error cases.
result.setText(error.getErrorMessage(), "en");
}
return result;
}
/**
* Convert a PacketError instance to an internal FederationError. This may
* return an error code of UNDEFINED_CONDITION if the incoming error can't be
* understood.
*
* @param error the incoming PacketError
* @return the generated FederationError instance
*/
private static FederationError toFederationError(PacketError error) {
String tag = error.getCondition().name().toUpperCase().replace('-', '_');
FederationError.Code code;
try {
code = FederationError.Code.valueOf(tag);
} catch (IllegalArgumentException e) {
code = FederationError.Code.UNDEFINED_CONDITION;
}
FederationError.Builder builder = FederationError.newBuilder().setErrorCode(code);
if (error.getText() != null) {
builder.setErrorMessage(error.getText());
}
return builder.build();
}
}