blob: 3e03ef7e875c01e57232d0b36a5aff9d306b4a4b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.waveprotocol.wave.federation.xmpp;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.inject.Inject;
import com.typesafe.config.Config;
import org.dom4j.Element;
import org.waveprotocol.wave.federation.FederationErrorProto.FederationError;
import org.waveprotocol.wave.federation.FederationErrors;
import org.xmpp.packet.IQ;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Implementation of XMPP Discovery. Provides public methods to respond to incoming disco requests
* (via {@link XmppManager}), as well as outgoing disco via {{@link #discoverRemoteJid}.
*
* @author arb@google.com (Anthony Baxter)
* @author thorogood@google.com (Sam Thorogood)
*/
public class XmppDisco {
@SuppressWarnings("unused")
private static final Logger LOG = Logger.getLogger(XmppDisco.class.getCanonicalName());
// This tracks the number of disco attempts started.
public static final LoadingCache<String, AtomicLong> statDiscoStarted =
CacheBuilder.newBuilder().build(new CacheLoader<String, AtomicLong>() {
@Override
public AtomicLong load(@SuppressWarnings("NullableProblems") String domain) {
return new AtomicLong();
}
});
private final LoadingCache<String, RemoteDisco> discoRequests;
private final String serverDescription;
private XmppManager manager = null;
// Accessed by XmppFederationHostForDomain.
final long failExpirySecs;
final long successExpirySecs;
final long discoExpirationHours;
final String discoInfoCategory;
final String discoInfoType;
/**
* Constructor. Note that {@link #setManager} must be called before this class is ready to use.
*/
@Inject
public XmppDisco(Config config) {
this.serverDescription = config.getString("federation.xmpp_server_description");
this.discoInfoCategory = config.getString("federation.disco_info_category");
this.discoInfoType = config.getString("federation.disco_info_type");
this.failExpirySecs = config.getDuration("federation.xmpp_disco_failed_expiry", TimeUnit.SECONDS);
this.successExpirySecs = config.getDuration("federation.xmpp_disco_successful_expiry", TimeUnit.SECONDS);
this.discoExpirationHours = config.getDuration("federation.disco_expiration", TimeUnit.HOURS);
//noinspection NullableProblems
discoRequests =
CacheBuilder.newBuilder().expireAfterWrite(
discoExpirationHours, TimeUnit.HOURS).build(
new CacheLoader<String, RemoteDisco>() {
@Override
public RemoteDisco load(String domain) throws Exception {
statDiscoStarted.get(domain).incrementAndGet();
return new RemoteDisco(manager, domain, failExpirySecs, successExpirySecs);
}
});
}
/**
* Set the manager instance for this class. Must be invoked before any other
* methods are used.
* @param manager an XmppManager instance
*/
public void setManager(XmppManager manager) {
this.manager = manager;
}
/**
* Handles a disco info get from a foreign source. A remote server is trying to ask us what we
* support. Send back a message identifying as a wave component.
*
* @param iq the IQ packet.
* @param responseCallback callback used to send response
*/
void processDiscoInfoGet(IQ iq, PacketCallback responseCallback) {
IQ response = IQ.createResultIQ(iq);
Element query = response.setChildElement("query", XmppNamespace.NAMESPACE_DISCO_INFO);
query.addElement("identity")
.addAttribute("category", discoInfoCategory)
.addAttribute("type", discoInfoType)
.addAttribute("name", serverDescription);
query.addElement("feature")
.addAttribute("var", XmppNamespace.NAMESPACE_WAVE_SERVER);
responseCallback.run(response);
}
/**
* Handles a disco items get from a foreign XMPP agent. No useful responses, since we're not a
* domain on it's own: just the wave component.
*
* @param iq the IQ packet.
* @param responseCallback callback used to send response
*/
void processDiscoItemsGet(IQ iq, PacketCallback responseCallback) {
IQ response = IQ.createResultIQ(iq);
response.setChildElement("query", XmppNamespace.NAMESPACE_DISCO_ITEMS);
responseCallback.run(response);
}
/**
* Attempt to discover the remote JID for this domain. Hands control to {@link RemoteDisco}.
*
* @param remoteDomain the domain to discover
* @param callback a callback to trigger when disco completes
*/
public void discoverRemoteJid(String remoteDomain, SuccessFailCallback<String, String> callback) {
Preconditions.checkNotNull("Must call setManager first", manager);
RemoteDisco disco = discoRequests.getIfPresent(remoteDomain);
if (disco != null) {
// This is a race condition, but we don't care if we lose it, because the ttl timestamp
// won't be exceeded in that case.
if (disco.ttlExceeded()) {
if (LOG.isLoggable(Level.FINE)) {
LOG.info("discoverRemoteJid for " + remoteDomain + ": result ttl exceeded.");
}
// TODO(arb): should we expose the disco cache somehow for debugging?
discoRequests.invalidate(remoteDomain);
}
}
try {
discoRequests.get(remoteDomain).discoverRemoteJID(callback);
} catch (ExecutionException ex) {
throw new RuntimeException(ex);
}
}
/**
* Inject a predetermined result into the disco results map. If the passed jid is null, generate
* an error/not-found case.
*
* @param domain remote domain
* @param jid remote JID
* @throws IllegalStateException if there is already a result for this domain
*/
@VisibleForTesting
void testInjectInDomainToJidMap(String domain, String jid) {
FederationError error = null;
if (jid == null) {
error = FederationErrors.badRequest("Fake injected error");
}
RemoteDisco disco = discoRequests.getIfPresent(domain);
Preconditions.checkState(disco == null);
discoRequests.put(domain, new RemoteDisco(domain, jid, error));
}
/**
* Determine whether a request for the given domain is pending.
*
* @param domain remote domain
* @return true/false
*/
@VisibleForTesting
boolean isDiscoRequestPending(String domain) throws ExecutionException {
RemoteDisco disco = discoRequests.getIfPresent(domain);
return disco != null && disco.isRequestPending();
}
/**
* Determine whether the disco request for the given domain has been touched or is at all
* available.
*
* @param domain remote domain
* @return true/false
*/
@VisibleForTesting
boolean isDiscoRequestAvailable(String domain) {
return discoRequests.getIfPresent(domain) != null;
}
}