blob: 7fdb0a55cdad4ffb8a611e1a7274f9e86110d60d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.vysper.xmpp.delivery.inbound;
import java.io.IOException;
import java.io.Writer;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.vysper.compliance.SpecCompliant;
import org.apache.vysper.xmpp.addressing.Entity;
import org.apache.vysper.xmpp.addressing.EntityImpl;
import org.apache.vysper.xmpp.delivery.OfflineStanzaReceiver;
import org.apache.vysper.xmpp.delivery.StanzaRelay;
import org.apache.vysper.xmpp.delivery.failure.DeliveryException;
import org.apache.vysper.xmpp.delivery.failure.DeliveryFailureStrategy;
import org.apache.vysper.xmpp.delivery.failure.ServiceNotAvailableException;
import org.apache.vysper.xmpp.protocol.NamespaceURIs;
import org.apache.vysper.xmpp.server.InternalServerRuntimeContext;
import org.apache.vysper.xmpp.server.InternalSessionContext;
import org.apache.vysper.xmpp.server.resources.ManagedThreadPool;
import org.apache.vysper.xmpp.server.resources.ManagedThreadPoolUtil;
import org.apache.vysper.xmpp.server.s2s.XMPPServerConnector;
import org.apache.vysper.xmpp.stanza.Stanza;
import org.apache.vysper.xmpp.stanza.StanzaBuilder;
import org.apache.vysper.xmpp.stanza.XMPPCoreStanza;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* relays all 'incoming' stanzas to internal sessions, acts as a 'stage' by using a ThreadPoolExecutor
* 'incoming' here means:
* a. stanzas coming in from other servers
* b. stanzas coming from other (local) sessions and are targeted to clients on this server
*
* @author The Apache MINA Project (dev@mina.apache.org)
*/
public class DeliveringExternalInboundStanzaRelay implements StanzaRelay, ManagedThreadPool {
final Logger logger = LoggerFactory.getLogger(DeliveringExternalInboundStanzaRelay.class);
private static class RejectedDeliveryHandler implements RejectedExecutionHandler {
DeliveringExternalInboundStanzaRelay relay;
Logger logger;
private RejectedDeliveryHandler(DeliveringExternalInboundStanzaRelay relay, Logger logger) {
this.relay = relay;
this.logger = logger;
}
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
logger.info("relaying of external inbound stanza has been rejected");
}
}
protected ExecutorService executor;
protected OfflineStanzaReceiver offlineStanzaReceiver = null;
protected InternalServerRuntimeContext serverRuntimeContext = null;
protected long lastCompleted = 0;
protected long lastDumpTimestamp = 0;
public DeliveringExternalInboundStanzaRelay() {
int coreThreadCount = 10;
int maxThreadCount = 20;
int threadTimeoutSeconds = 2 * 60;
this.executor = new ThreadPoolExecutor(coreThreadCount, maxThreadCount, threadTimeoutSeconds, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(), new RejectedDeliveryHandler(this, logger));
}
/*package*/ DeliveringExternalInboundStanzaRelay(ExecutorService executor) {
this.executor = executor;
}
public void setServerRuntimeContext(InternalServerRuntimeContext serverRuntimeContext) {
this.serverRuntimeContext = serverRuntimeContext;
}
public void setMaxThreadCount(int maxThreadPoolCount) {
if (!(executor instanceof ThreadPoolExecutor)) {
throw new IllegalStateException("cannot set max thread count for " + executor.getClass());
}
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor)executor;
threadPoolExecutor.setCorePoolSize(maxThreadPoolCount);
threadPoolExecutor.setMaximumPoolSize(2*maxThreadPoolCount);
}
public void setThreadTimeoutSeconds(int threadTimeoutSeconds) {
if (!(executor instanceof ThreadPoolExecutor)) {
throw new IllegalStateException("cannot set thread timeout for " + executor.getClass());
}
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor)executor;
threadPoolExecutor.setKeepAliveTime(threadTimeoutSeconds, TimeUnit.SECONDS);
}
public void dumpThreadPoolInfo(Writer writer) throws IOException {
if (!(executor instanceof ThreadPoolExecutor)) {
throw new IllegalStateException("cannot dump info for " + executor.getClass());
}
ThreadPoolExecutor pool = (ThreadPoolExecutor)executor;
final long now = System.currentTimeMillis();
writer.append("==== externalRelay:").append("\n");
ManagedThreadPoolUtil.writeThreadPoolInfo(writer, pool);
final long completedTaskCount = pool.getCompletedTaskCount();
if (lastDumpTimestamp > 0) {
writer.append("throughput=\t").append(Long.toString(completedTaskCount - lastCompleted))
.append(" per ").append(Long.toString(now - lastDumpTimestamp)).append("\n");
}
lastDumpTimestamp = now;
lastCompleted = completedTaskCount;
}
public void relay(InternalSessionContext sessionContext, Entity receiver, Stanza stanza, DeliveryFailureStrategy deliveryFailureStrategy)
throws DeliveryException {
if (!isRelaying()) {
throw new ServiceNotAvailableException("external inbound relay is not relaying");
}
// rewrite the namespace into the jabber:server namespace
stanza = StanzaBuilder.rewriteNamespace(stanza, NamespaceURIs.JABBER_CLIENT, NamespaceURIs.JABBER_SERVER);
XMPPCoreStanza coreStanza = XMPPCoreStanza.getWrapper(stanza);
if(coreStanza != null) {
Future<RelayResult> resultFuture = executor.submit(new OutboundRelayCallable(coreStanza, deliveryFailureStrategy));
} else {
// ignore non-core stanzas
}
}
public boolean isRelaying() {
return !executor.isShutdown();
}
public void stop() {
this.executor.shutdown();
}
private class OutboundRelayCallable implements Callable<RelayResult> {
private XMPPCoreStanza stanza;
private DeliveryFailureStrategy deliveryFailureStrategy;
OutboundRelayCallable(XMPPCoreStanza stanza, DeliveryFailureStrategy deliveryFailureStrategy) {
this.stanza = stanza;
this.deliveryFailureStrategy = deliveryFailureStrategy;
}
public RelayResult call() {
RelayResult relayResult = deliver();
if (relayResult == null || !relayResult.hasProcessingErrors()) {
return relayResult;
} else {
return runFailureStrategy(relayResult);
}
}
private RelayResult runFailureStrategy(RelayResult relayResult) {
if (deliveryFailureStrategy != null) {
try {
deliveryFailureStrategy.process(stanza, relayResult.getProcessingErrors());
} catch (DeliveryException e) {
return new RelayResult(e);
} catch (RuntimeException e) {
return new RelayResult(new DeliveryException(e));
}
}
// TODO throw relayResult.getProcessingError() in some appropriate context
return relayResult;
}
/**
* @return
*/
@SpecCompliant(spec = "draft-ietf-xmpp-3920bis-22", section = "10.4", status = SpecCompliant.ComplianceStatus.IN_PROGRESS, coverage = SpecCompliant.ComplianceCoverage.COMPLETE)
protected RelayResult deliver() {
try {
RelayResult relayResult = new RelayResult();
XMPPServerConnector connector = serverRuntimeContext.getServerConnectorRegistry().connect(EntityImpl.parseUnchecked(stanza.getTo().getDomain()));
connector.write(stanza);
return relayResult.setProcessed();
} catch (DeliveryException e) {
return new RelayResult(e);
}
}
}
}