blob: 6752ee1a893485a8a6f074ad72f302b1c52d3d76 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.vysper.xmpp.server.s2s;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.UnresolvedAddressException;
import java.util.Arrays;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.service.IoConnector;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.ssl.SslFilter;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
import org.apache.vysper.mina.MinaBackedSessionContext;
import org.apache.vysper.mina.StanzaLoggingFilter;
import org.apache.vysper.mina.codec.XMPPProtocolCodecFactory;
import org.apache.vysper.xmpp.addressing.Entity;
import org.apache.vysper.xmpp.delivery.failure.RemoteServerNotFoundException;
import org.apache.vysper.xmpp.delivery.failure.RemoteServerTimeoutException;
import org.apache.vysper.xmpp.modules.extension.xep0199_xmppping.XmppPingListener;
import org.apache.vysper.xmpp.modules.extension.xep0199_xmppping.XmppPingModule;
import org.apache.vysper.xmpp.modules.extension.xep0220_server_dailback.DbResultHandler;
import org.apache.vysper.xmpp.modules.extension.xep0220_server_dailback.DbVerifyHandler;
import org.apache.vysper.xmpp.modules.extension.xep0220_server_dailback.DialbackIdGenerator;
import org.apache.vysper.xmpp.protocol.NamespaceURIs;
import org.apache.vysper.xmpp.protocol.ResponseStanzaContainer;
import org.apache.vysper.xmpp.protocol.SessionStateHolder;
import org.apache.vysper.xmpp.protocol.StanzaHandler;
import org.apache.vysper.xmpp.server.ServerRuntimeContext;
import org.apache.vysper.xmpp.server.SessionContext;
import org.apache.vysper.xmpp.server.SessionState;
import org.apache.vysper.xmpp.server.XMPPVersion;
import org.apache.vysper.xmpp.server.response.ServerResponses;
import org.apache.vysper.xmpp.server.s2s.XmppEndpointResolver.ResolvedAddress;
import org.apache.vysper.xmpp.stanza.Stanza;
import org.apache.vysper.xmpp.stanza.StanzaBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Default implementation of {@link XMPPServerConnector}
*
* @author The Apache MINA Project (dev@mina.apache.org)
*/
public class DefaultXMPPServerConnector implements XmppPingListener, XMPPServerConnector {
private static final Logger LOG = LoggerFactory.getLogger(DefaultXMPPServerConnector.class);
private ServerRuntimeContext serverRuntimeContext;
private MinaBackedSessionContext sessionContext;
private Entity otherServer;
private SessionStateHolder sessionStateHolder = new SessionStateHolder();
private IoConnector connector;
private int connectTimeout = 30000;
private int xmppHandshakeTimeout = 30000;
private int pingPeriod = 30000;
private int pingTimeout = 10000;
private boolean closed = false;
private SessionContext dialbackSessionContext;
private SessionStateHolder dialbackSessionStateHolder;
private Timer pingTimer;
public DefaultXMPPServerConnector(Entity otherServer, ServerRuntimeContext serverRuntimeContext, SessionContext dialbackSessionContext, SessionStateHolder dialbackSessionStateHolder) {
this.serverRuntimeContext = serverRuntimeContext;
this.otherServer = otherServer;
this.dialbackSessionContext = dialbackSessionContext;
this.dialbackSessionStateHolder = dialbackSessionStateHolder;
}
/**
* Connect and authenticate the XMPP server connector
*/
public synchronized void start() throws RemoteServerNotFoundException, RemoteServerTimeoutException {
LOG.info("Starting XMPP server connector to {}", otherServer);
// make this method synchronous
final CountDownLatch authenticatedLatch = new CountDownLatch(1);
boolean successfullyConnected = false;
XmppEndpointResolver resolver = new XmppEndpointResolver();
List<ResolvedAddress> addresses = resolver.resolveXmppServer(otherServer.getDomain());
Throwable lastException = null;
if(!addresses.isEmpty()) {
LOG.info("resolved {} address(es) for {}", addresses.size(), otherServer);
for(ResolvedAddress address : addresses) {
final InetSocketAddress ipAddress = address.getAddress();
LOG.info("Connecting to XMPP server {} at {}", otherServer, ipAddress);
connector = createConnector(authenticatedLatch);
ConnectFuture connectFuture = connector.connect(ipAddress);
if(connectFuture.awaitUninterruptibly(connectTimeout) && connectFuture.isConnected()) {
// success on the TCP/IP level, now wait for the XMPP handshake
LOG.info("XMPP server {} connected at {}", otherServer, ipAddress);
try {
if(authenticatedLatch.await(xmppHandshakeTimeout, TimeUnit.MILLISECONDS)) {
// success, break out of connect loop
successfullyConnected = true;
break;
} else {
// attempt next
LOG.warn("XMPP handshake with {} at {} timed out", otherServer, ipAddress);
}
} catch (InterruptedException e) {
throw new RemoteServerTimeoutException("Connection to " + otherServer + " was interrupted", e);
}
}
lastException = connectFuture.getException();
LOG.warn("Failed connecting to XMPP server " + otherServer + " at " + ipAddress, connectFuture.getException());
disposeAndNullifyConnector();
}
} else {
// should never happen
throw new RemoteServerNotFoundException("DNS lookup of remote server failed");
}
if(!successfullyConnected) {
String exceptionMsg = "Failed to connect to XMPP server at " + otherServer;
if(lastException instanceof UnresolvedAddressException) {
throw new RemoteServerNotFoundException(exceptionMsg);
} else {
throw new RemoteServerTimeoutException(exceptionMsg);
}
}
}
private void disposeAndNullifyConnector() {
IoConnector localConnector = connector;
if (localConnector == null) return;
localConnector.dispose();
connector = null;
}
private NioSocketConnector createConnector(CountDownLatch authenticatedLatch) {
NioSocketConnector connector = new NioSocketConnector();
DefaultIoFilterChainBuilder filterChainBuilder = new DefaultIoFilterChainBuilder();
filterChainBuilder.addLast("xmppCodec", new ProtocolCodecFilter(new XMPPProtocolCodecFactory()));
filterChainBuilder.addLast("loggingFilter", new StanzaLoggingFilter());
connector.setFilterChainBuilder(filterChainBuilder);
connector.setHandler(new ConnectorIoHandler(authenticatedLatch));
return connector;
}
private void startPinging() {
// are pings not already running and is the XMPP ping module active?
if(pingTimer == null && serverRuntimeContext.getModule(XmppPingModule.class) != null) {
pingTimer = new Timer("pingtimer", true);
pingTimer.schedule(new PingTask(), pingPeriod, pingPeriod);
}
}
/**
* {@inheritDoc}
*/
public void write(Stanza stanza) {
sessionContext.write(stanza);
}
/**
* {@inheritDoc}
*/
public void close() {
try {
if(!closed) {
LOG.info("XMPP server connector to {} closing", otherServer);
if (pingTimer != null) pingTimer.cancel();
sessionContext.close();
disposeAndNullifyConnector();
LOG.info("XMPP server connector to {} closed", otherServer);
}
} finally {
closed = true;
}
}
/**
* {@inheritDoc}
*/
public void pong() {
// do nothing, all happy
}
/**
* {@inheritDoc}
*/
public void timeout() {
LOG.debug("XMPP server connector to {} timed out, closing", otherServer);
close();
}
/**
* Is this XMPP server connector closed?
* @return true if the connector is closed
*/
public boolean isClosed() {
return closed;
}
private final class ConnectorIoHandler extends IoHandlerAdapter {
private final List<StanzaHandler> handlers = Arrays.asList(
new DbVerifyHandler(),
new DbResultHandler(),
new TlsProceedHandler(),
new FeaturesHandler()
);
private final CountDownLatch authenticatedLatch;
private ConnectorIoHandler(CountDownLatch authenticatedLatch) {
this.authenticatedLatch = authenticatedLatch;
}
/**
* {@inheritDoc}
*/
@Override
public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
if (cause instanceof IOException) {
if (cause instanceof javax.net.ssl.SSLHandshakeException) {
LOG.warn("failed to complete SSL handshake with server {}: {}", otherServer, cause.getMessage());
} else if (cause instanceof javax.net.ssl.SSLException) {
LOG.warn("failure in SSL with server {}: {}", otherServer, cause.getMessage());
} else {
LOG.info("I/O exception with server {}: {}", otherServer, cause.getMessage());
}
close();
} else {
LOG.warn("Exception {} thrown by XMPP server connector to " + otherServer + ", probably a bug in Vysper: {}", cause.getClass().getName(), cause.getMessage());
}
}
private StanzaHandler lookupHandler(Stanza stanza) {
for (StanzaHandler handler : handlers) {
if (handler.verify(stanza)) {
return handler;
}
}
return null;
}
/**
* {@inheritDoc}
*/
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
if(message == SslFilter.SESSION_SECURED) {
// connection secured, send stream opener
sessionStateHolder.setState(SessionState.ENCRYPTED);
LOG.info("XMPP server connector to {} secured using TLS", otherServer);
LOG.debug("XMPP server connector to {} restarting stream", otherServer);
sessionContext.setIsReopeningXMLStream();
Stanza opener = new ServerResponses().getStreamOpenerForServerConnector(serverRuntimeContext.getServerEnitity(), otherServer, XMPPVersion.VERSION_1_0, sessionContext);
sessionContext.write(opener);
} else if(message == SslFilter.SESSION_UNSECURED) {
// unsecured, closing
close();
} else if(message instanceof Stanza) {
Stanza stanza = (Stanza) message;
// check for basic stanza handlers
StanzaHandler handler = lookupHandler(stanza);
if(handler != null) {
ResponseStanzaContainer container = handler.execute(stanza, serverRuntimeContext, false, sessionContext, sessionStateHolder);
if(container != null && container.hasResponse()) {
sessionContext.write(container.getResponseStanza());
}
if(sessionStateHolder.getState() == SessionState.AUTHENTICATED) {
LOG.info("XMPP server connector to {} authenticated", otherServer);
authenticatedLatch.countDown();
// connection established, start pinging
startPinging();
}
// none of the handlers matched, stream start is handled separately
} else if(stanza.getName().equals("stream")) {
sessionContext.setSessionId(stanza.getAttributeValue("id"));
sessionContext.setInitiatingEntity(otherServer);
String version = stanza.getAttributeValue("version");
if(version == null) {
// old protocol, assume dialback
String dailbackId = new DialbackIdGenerator().generate(otherServer, serverRuntimeContext.getServerEnitity(), sessionContext.getSessionId());
Stanza dbResult = new StanzaBuilder("result", NamespaceURIs.JABBER_SERVER_DIALBACK, "db")
.addAttribute("from", serverRuntimeContext.getServerEnitity().getDomain())
.addAttribute("to", otherServer.getDomain())
.addText(dailbackId)
.build();
write(dbResult);
}
if(dialbackSessionContext != null) {
// connector is being used for dialback verification, don't do further authentication
sessionContext.putAttribute("DIALBACK_SESSION_CONTEXT", dialbackSessionContext);
sessionContext.putAttribute("DIALBACK_SESSION_STATE_HOLDER", dialbackSessionStateHolder);
sessionContext.setInitiatingEntity(otherServer);
sessionStateHolder.setState(SessionState.AUTHENTICATED);
authenticatedLatch.countDown();
}
} else {
// TODO other stanzas coming here?
if (message != null) LOG.warn("unhandled stanza in S2S ConnectorIoHandler: " + message);
}
} else {
throw new RuntimeException("Only handles SSL events and stanzas, got: " + message.getClass());
}
}
/**
* {@inheritDoc}
*/
@Override
public void sessionClosed(IoSession session) throws Exception {
// Socket was closed, make sure we close the connector
LOG.info("XMPP server connector socket closed, closing connector");
close();
}
/**
* {@inheritDoc}
*/
@Override
public void sessionOpened(IoSession session) throws Exception {
LOG.info("XMPP server session opened to {}", otherServer);
sessionContext = new MinaBackedSessionContext(serverRuntimeContext, sessionStateHolder, session);
sessionStateHolder.setState(SessionState.INITIATED);
Stanza opener = new ServerResponses().getStreamOpenerForServerConnector(serverRuntimeContext.getServerEnitity(), otherServer, XMPPVersion.VERSION_1_0, sessionContext);
sessionContext.write(opener);
}
}
private class PingTask extends TimerTask {
public void run() {
XmppPingModule pingModule = serverRuntimeContext.getModule(XmppPingModule.class);
LOG.info("pinging federated XMPP server {}", otherServer);
pingModule.ping(DefaultXMPPServerConnector.this, serverRuntimeContext.getServerEnitity(), otherServer, pingTimeout, DefaultXMPPServerConnector.this);
}
}
}