blob: 749d975d173f8c40fa6968e178c5cf469392ea02 [file] [log] [blame]
package org.apache.vysper.xmpp.server.s2s;
import java.nio.channels.UnresolvedAddressException;
import java.util.Arrays;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.service.IoConnector;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.ssl.SslFilter;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
import org.apache.vysper.mina.MinaBackedSessionContext;
import org.apache.vysper.mina.StanzaLoggingFilter;
import org.apache.vysper.mina.codec.XMPPProtocolCodecFactory;
import org.apache.vysper.xmpp.addressing.Entity;
import org.apache.vysper.xmpp.delivery.failure.RemoteServerNotFoundException;
import org.apache.vysper.xmpp.delivery.failure.RemoteServerTimeoutException;
import org.apache.vysper.xmpp.modules.extension.xep0119_xmppping.XmppPingListener;
import org.apache.vysper.xmpp.modules.extension.xep0119_xmppping.XmppPingModule;
import org.apache.vysper.xmpp.modules.extension.xep0220_server_dailback.DbResultHandler;
import org.apache.vysper.xmpp.modules.extension.xep0220_server_dailback.DbVerifyHandler;
import org.apache.vysper.xmpp.modules.extension.xep0220_server_dailback.DialbackIdGenerator;
import org.apache.vysper.xmpp.protocol.NamespaceURIs;
import org.apache.vysper.xmpp.protocol.ResponseStanzaContainer;
import org.apache.vysper.xmpp.protocol.SessionStateHolder;
import org.apache.vysper.xmpp.protocol.StanzaHandler;
import org.apache.vysper.xmpp.server.ServerRuntimeContext;
import org.apache.vysper.xmpp.server.SessionContext;
import org.apache.vysper.xmpp.server.SessionState;
import org.apache.vysper.xmpp.server.XMPPVersion;
import org.apache.vysper.xmpp.server.response.ServerResponses;
import org.apache.vysper.xmpp.server.s2s.XmppEndpointResolver.ResolvedAddress;
import org.apache.vysper.xmpp.stanza.Stanza;
import org.apache.vysper.xmpp.stanza.StanzaBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DefaultXMPPServerConnector implements XmppPingListener, XMPPServerConnector {
private static final Logger LOG = LoggerFactory.getLogger(DefaultXMPPServerConnector.class);
private ServerRuntimeContext serverRuntimeContext;
private MinaBackedSessionContext sessionContext;
private Entity otherServer;
private SessionStateHolder sessionStateHolder = new SessionStateHolder();
private IoConnector connector;
private int connectTimeout = 30000;
private int xmppHandshakeTimeout = 30000;
private int pingPeriod = 30000;
private int pingTimeout = 10000;
private boolean closed = false;
private SessionContext dialbackSessionContext;
private SessionStateHolder dialbackSessionStateHolder;
private Timer pingTimer;
public DefaultXMPPServerConnector(Entity otherServer, ServerRuntimeContext serverRuntimeContext, SessionContext dialbackSessionContext, SessionStateHolder dialbackSessionStateHolder) {
this.serverRuntimeContext = serverRuntimeContext;
this.otherServer = otherServer;
this.dialbackSessionContext = dialbackSessionContext;
this.dialbackSessionStateHolder = dialbackSessionStateHolder;
}
public synchronized void start() throws RemoteServerNotFoundException, RemoteServerTimeoutException {
LOG.info("Starting XMPP server connector to {}", otherServer);
// make this method synchronous
final CountDownLatch authenticatedLatch = new CountDownLatch(1);
boolean successfullyConnected = false;
XmppEndpointResolver resolver = new XmppEndpointResolver();
List<ResolvedAddress> addresses = resolver.resolveXmppServer(otherServer.getDomain());
Throwable lastException = null;
if(!addresses.isEmpty()) {
for(ResolvedAddress address : addresses) {
LOG.info("Connecting to XMPP server {} at {}", otherServer, address.getAddress());
connector = createConnector(authenticatedLatch);
ConnectFuture connectFuture = connector.connect(address.getAddress());
if(connectFuture.awaitUninterruptibly(connectTimeout) && connectFuture.isConnected()) {
// success on the TCP/IP level, now wait for the XMPP handshake
try {
if(authenticatedLatch.await(xmppHandshakeTimeout, TimeUnit.MILLISECONDS)) {
// success, break out of connect loop
successfullyConnected = true;
break;
} else {
// attempt next
LOG.warn("XMPP handshake with {} at () timed out", otherServer, address.getAddress());
}
} catch (InterruptedException e) {
throw new RemoteServerTimeoutException("Connection to " + otherServer + " was interrupted", e);
}
}
lastException = connectFuture.getException();
LOG.warn("Failed connecting to XMPP server " + otherServer + " at " + address.getAddress(), connectFuture.getException());
connector.dispose();
connector = null;
}
} else {
// should never happen
throw new RemoteServerNotFoundException("DNS lookup of remote server failed");
}
if(!successfullyConnected) {
String exceptionMsg = "Failed to connect to XMPP server at " + otherServer;
if(lastException instanceof UnresolvedAddressException) {
throw new RemoteServerNotFoundException(exceptionMsg);
} else {
throw new RemoteServerTimeoutException(exceptionMsg);
}
}
}
private NioSocketConnector createConnector(CountDownLatch authenticatedLatch) {
NioSocketConnector connector = new NioSocketConnector();
DefaultIoFilterChainBuilder filterChainBuilder = new DefaultIoFilterChainBuilder();
filterChainBuilder.addLast("xmppCodec", new ProtocolCodecFilter(new XMPPProtocolCodecFactory()));
filterChainBuilder.addLast("loggingFilter", new StanzaLoggingFilter());
connector.setFilterChainBuilder(filterChainBuilder);
connector.setHandler(new ConnectorIoHandler(authenticatedLatch));
return connector;
}
private void startPinging() {
// are pings not already running and is the XMPP ping module active?
if(pingTimer == null && serverRuntimeContext.getModule(XmppPingModule.class) != null) {
pingTimer = new Timer("pingtimer", true);
pingTimer.schedule(new PingTask(), pingPeriod, pingPeriod);
}
}
/* (non-Javadoc)
* @see org.apache.vysper.xmpp.server.s2s.XMPPServerConnector#write(org.apache.vysper.xmpp.stanza.Stanza)
*/
public void write(Stanza stanza) {
sessionContext.write(stanza);
}
public void close() {
closed = true;
if(!closed) {
LOG.info("XMPP server connector to {} closing", otherServer);
sessionContext.close();
connector.dispose();
pingTimer.cancel();
LOG.info("XMPP server connector to {} closed", otherServer);
}
}
public void pong() {
// do nothing, all happy
}
public void timeout() {
LOG.debug("XMPP server connector to {} timed out, closing", otherServer);
close();
}
public boolean isClosed() {
return closed;
}
private final class ConnectorIoHandler extends IoHandlerAdapter {
private final List<StanzaHandler> handlers = Arrays.asList(
new DbVerifyHandler(),
new DbResultHandler(),
new TlsProceedHandler(),
new FeaturesHandler()
);
private final CountDownLatch authenticatedLatch;
private ConnectorIoHandler(CountDownLatch authenticatedLatch) {
this.authenticatedLatch = authenticatedLatch;
}
@Override
public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
LOG.warn("Exception thrown by XMPP server connector to " + otherServer + ", probably a bug in Vysper", cause);
}
private StanzaHandler lookupHandler(Stanza stanza) {
for(StanzaHandler handler : handlers) {
if(handler.verify(stanza)) {
return handler;
}
}
return null;
}
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
if(message == SslFilter.SESSION_SECURED) {
// connection secured, send stream opener
sessionStateHolder.setState(SessionState.ENCRYPTED);
LOG.info("XMPP server connector to {} secured using TLS", otherServer);
LOG.debug("XMPP server connector to {} restarting stream", otherServer);
sessionContext.setIsReopeningXMLStream();
Stanza opener = new ServerResponses().getStreamOpenerForServerConnector(serverRuntimeContext.getServerEnitity(), otherServer, XMPPVersion.VERSION_1_0, sessionContext);
sessionContext.write(opener);
} else if(message instanceof Stanza) {
Stanza stanza = (Stanza) message;
// check for basic stanza handlers
StanzaHandler handler = lookupHandler(stanza);
if(handler != null) {
ResponseStanzaContainer container = handler.execute(stanza, serverRuntimeContext, false, sessionContext, sessionStateHolder);
if(container != null && container.hasResponse()) {
sessionContext.write(container.getResponseStanza());
}
if(sessionStateHolder.getState() == SessionState.AUTHENTICATED) {
LOG.info("XMPP server connector to {} authenticated", otherServer);
authenticatedLatch.countDown();
// connection established, start pinging
startPinging();
}
// none of the handlers matched, stream start is handled separately
} else if(stanza.getName().equals("stream")) {
sessionContext.setSessionId(stanza.getAttributeValue("id"));
sessionContext.setInitiatingEntity(otherServer);
String version = stanza.getAttributeValue("version");
if(version == null) {
// old protocol, assume dialback
String dailbackId = new DialbackIdGenerator().generate(otherServer, serverRuntimeContext.getServerEnitity(), sessionContext.getSessionId());
Stanza dbResult = new StanzaBuilder("result", NamespaceURIs.JABBER_SERVER_DIALBACK, "db")
.addAttribute("from", serverRuntimeContext.getServerEnitity().getDomain())
.addAttribute("to", otherServer.getDomain())
.addText(dailbackId)
.build();
write(dbResult);
}
if(dialbackSessionContext != null) {
// connector is being used for dialback verification, don't do further authentication
sessionContext.putAttribute("DIALBACK_SESSION_CONTEXT", dialbackSessionContext);
sessionContext.putAttribute("DIALBACK_SESSION_STATE_HOLDER", dialbackSessionStateHolder);
sessionContext.setInitiatingEntity(otherServer);
sessionStateHolder.setState(SessionState.AUTHENTICATED);
authenticatedLatch.countDown();
}
} else {
// TODO other stanzas coming here?
}
} else {
throw new RuntimeException("Only handles SSL events and stanzas");
}
}
@Override
public void sessionClosed(IoSession session) throws Exception {
// Socket was closed, make sure we close the connector
LOG.info("XMPP server connector socket closed, closing connector");
close();
}
@Override
public void sessionOpened(IoSession session) throws Exception {
sessionContext = new MinaBackedSessionContext(serverRuntimeContext, sessionStateHolder, session);
sessionStateHolder.setState(SessionState.INITIATED);
Stanza opener = new ServerResponses().getStreamOpenerForServerConnector(serverRuntimeContext.getServerEnitity(), otherServer, XMPPVersion.VERSION_1_0, sessionContext);
sessionContext.write(opener);
}
}
private class PingTask extends TimerTask {
public void run() {
XmppPingModule pingModule = serverRuntimeContext.getModule(XmppPingModule.class);
pingModule.ping(DefaultXMPPServerConnector.this, serverRuntimeContext.getServerEnitity(), otherServer, pingTimeout, DefaultXMPPServerConnector.this);
}
}
}