WAVE-438 - Removes XMPP federation implementation along with relevant resources and unit tests.
Fixes issue with a test when running with Java1.8.
diff --git a/README.md b/README.md
index b33916a..4572e41 100644
--- a/README.md
+++ b/README.md
@@ -160,17 +160,6 @@
- if a jar is unable to be unzipped with wave:extractApi then delete the jar from your cache and try again.
You may need to restart. If problem persists let the newsgroup know or create an issue on Jira.
-To config your server a default configuration is provided by reference.conf,
-this can be overwritten by application.conf with custom values.
-
-To enable federation the following must be run.
-
-To create a simple configuration run:
- `./gradlew prosody-config`
-
-To override default values pass them to the ant script.
-For example, to override wave\_server\_domain run:
-`./gradlew prosody-config -Dwave_server_domain=example.com`
Take a look at the reference.conf to learn about configuration and possible/default values.
The server can be started (on Linux/MacOS) by running
diff --git a/wave/build.gradle b/wave/build.gradle
index 71ee962..f09b059 100644
--- a/wave/build.gradle
+++ b/wave/build.gradle
@@ -30,7 +30,7 @@
/* Meta Data Info */
def title = 'Apache Wave Server'
def vendor = 'The Apache Software Foundation'
-version = "0.4.1"
+version = "0.4.2"
mainClassName = "org.waveprotocol.box.server.ServerMain"
applicationDefaultJvmArgs = [
"-Xmx1024M",
@@ -166,9 +166,6 @@
[group: "org.eclipse.jetty.websocket", name: "websocket-common", version: "9.2.14.v20151106"], // [?, ?]
[group: "org.eclipse.jetty.websocket", name: "websocket-server", version: "9.2.14.v20151106"], // [?, ?]
[group: "org.eclipse.jetty.websocket", name: "websocket-servlet", version: "9.2.14.v20151106"], // [?, ?]
- [group: "org.gnu.inet", name: "libidn", version: "1.15"], // [?, ?]
- [group: "org.igniterealtime", name: "tinder", version: "1.2.3"], // [1/2016, 6/2016]
- [group: "org.igniterealtime.whack", name: "core", version: "2.0.0"], // [1/2016, 6/2016]
[group: "org.jdom", name: "jdom", version: "1.1.3"], // [?, ?]
[group: "org.mongodb", name: "mongo-java-driver", version: "2.11.2"], // [?, ?]
[group: "org.slf4j", name: "slf4j-api", version: "1.6.1"], // [?, ?]
@@ -507,8 +504,6 @@
testMongo.mustRunAfter compileJava, test
testLarge.mustRunAfter test
-ant.importBuild 'config/server-config.xml'
-
//=============================================================================
// Custom UberJar Implementation
// Author Note: this custom implementation should be replaced by the shadow
diff --git a/wave/config/reference.conf b/wave/config/reference.conf
index 425a576..30cf51f 100644
--- a/wave/config/reference.conf
+++ b/wave/config/reference.conf
@@ -207,13 +207,11 @@
clientauth_cert_domain : ""
}
+# Please note that currently Wave in a Box server has no Federation implementation.
federation {
# Federation Configuration for the Wave in a Box server
enable_federation : false
- # These will probably need to be changed
- xmpp_server_secret : secret
-
# The PKCS#8-PEM-encoded private key.
certificate_private_key : "local.net.key"
@@ -228,33 +226,6 @@
# The domain for which the certificate was issued.
certificate_domain : "local.net"
- xmpp_component_name : wave
-
- # This server's local JID
- xmpp_jid : "wave.local.net"
-
- xmpp_server_description : "Wave in a Box"
-
- disco_info_category : "collaboration"
-
- disco_info_type : "apache-wave"
-
- xmpp_server_hostname : "local.net"
-
- xmpp_server_component_port : 5275
-
- # How long to cache failed disco results.
- xmpp_disco_failed_expiry : 300s
-
- # How long to cache successful disco results.
- xmpp_disco_successful_expiry : 7200s
-
- disco_expiration : 6h
-
- # Set XMPP_SERVER_IP to localhost if the XMPP and Wave in a Box servers are
- # running on the same host
- xmpp_server_ip : localhost
-
# Set true to disable the verification of signed deltas
waveserver_disable_verification : true
diff --git a/wave/config/server-config.xml b/wave/config/server-config.xml
deleted file mode 100644
index deb5f3b..0000000
--- a/wave/config/server-config.xml
+++ /dev/null
@@ -1,65 +0,0 @@
-<!--
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- -->
-<project name="server config" basedir="../" default="prosody-config">
- <description>Creates the server configuration file.</description>
-
- <property name="wave_server_domain" value="local.net" />
- <property name="xmpp_server_secret" value="opensesame" />
- <property name="certificate_private_key" value="${wave_server_domain}.key" />
- <property name="certificate_files" value="${wave_server_domain}.crt,sub.class1.server.ca.pem,ca.pem" />
- <property name="certificate_domain" value="${wave_server_domain}" />
- <property name="xmpp_component_name" value="wave" />
- <property name="xmpp_jid" value="${xmpp_component_name}.${wave_server_domain}" />
- <property name="xmpp_server_description" value=""Wave in a Box"" />
- <property name="xmpp_server_hostname" value="${wave_server_domain}" />
- <property name="xmpp_server_component_port" value="5275" />
- <property name="xmpp_server_to_server_port" value="5269" />
- <property name="xmpp_server_ping" value="wavesandbox.com" />
- <property name="xmpp_server_ip" value="${xmpp_server_hostname}" />
- <property name="waveserver_disable_verification" value="false" />
- <property name="waveserver_disable_signer_verification" value="false" />
-
-
- <target name="prosody-config"
- description="Run to create the prosody configuration files.
- ant -f server-config.xml prosody-config">
- <echo>Generating ${certificate_domain}.cfg.lua</echo>
- <copy file="${certificate_domain}.cfg.lua"
- tofile="${certificate_domain}.cfg.lua.old"
- overwrite="true"
- failonerror="false" />
- <copy file="prosody.cfg.lua.example" tofile="${certificate_domain}.cfg.lua" overwrite="true">
- <filterchain>
- <replacetokens>
- <token key="BASEDIR" value="${basedir}" />
- <token key="XMPP_SERVER_SECRET" value="${xmpp_server_secret}" />
- <token key="CERTIFICATE_PRIVATE_KEY" value="${certificate_private_key}" />
- <token key="CERTIFICATE_DOMAIN" value="${certificate_domain}" />
- <token key="XMPP_JID" value="${xmpp_jid}" />
- <token key="XMPP_SERVER_DESCRIPTION" value="${xmpp_server_description}" />
- <token key="XMPP_SERVER_COMPONENT_PORT" value="${xmpp_server_component_port}" />
- <token key="XMPP_SERVER_TO_SERVER_PORT" value="${xmpp_server_to_server_port}" />
- </replacetokens>
- </filterchain>
- </copy>
- <echo>Please, manually copy ${certificate_domain}.cfg.lua to your prosody configuration directory.</echo>
- <echo>E.g. sudo cp ${certificate_domain}.cfg.lua /etc/prosody/conf.d/${certificate_domain}.cfg.lua</echo>
- <echo>Additionally, ensure your ${certificate_domain} SRV record points to port ${xmpp_server_to_server_port}</echo>
- </target>
-</project>
diff --git a/wave/src/main/gxp/org/waveprotocol/box/server/gxp/AuthenticationPage.gxp b/wave/src/main/gxp/org/waveprotocol/box/server/gxp/AuthenticationPage.gxp
index d8791d4..d188b05 100644
--- a/wave/src/main/gxp/org/waveprotocol/box/server/gxp/AuthenticationPage.gxp
+++ b/wave/src/main/gxp/org/waveprotocol/box/server/gxp/AuthenticationPage.gxp
@@ -106,7 +106,6 @@
<li>
This project lets developers and
enterprise users run wave servers and host waves on their own hardware.
- And then share those waves with other wave servers.
</li>
</ul>
</p>
diff --git a/wave/src/main/java/org/waveprotocol/box/server/ServerMain.java b/wave/src/main/java/org/waveprotocol/box/server/ServerMain.java
index d258f9c..727ad44 100644
--- a/wave/src/main/java/org/waveprotocol/box/server/ServerMain.java
+++ b/wave/src/main/java/org/waveprotocol/box/server/ServerMain.java
@@ -61,7 +61,6 @@
import org.waveprotocol.wave.crypto.CertPathStore;
import org.waveprotocol.wave.federation.FederationTransport;
import org.waveprotocol.wave.federation.noop.NoOpFederationModule;
-import org.waveprotocol.wave.federation.xmpp.XmppFederationModule;
import org.waveprotocol.wave.model.version.HashedVersionFactory;
import org.waveprotocol.wave.model.wave.ParticipantIdUtil;
import org.waveprotocol.wave.util.logging.Log;
@@ -108,10 +107,9 @@
injector = injector.createChildInjector(profilingModule, executorsModule);
Config config = injector.getInstance(Config.class);
- boolean enableFederation = config.getBoolean("federation.enable_federation");
Module serverModule = injector.getInstance(ServerModule.class);
- Module federationModule = buildFederationModule(injector, enableFederation);
+ Module federationModule = buildFederationModule(injector);
Module robotApiModule = new RobotApiModule();
PersistenceModule persistenceModule = injector.getInstance(PersistenceModule.class);
Module searchModule = injector.getInstance(SearchModule.class);
@@ -140,15 +138,9 @@
server.startWebSocketServer(injector);
}
- private static Module buildFederationModule(Injector settingsInjector, boolean enableFederation)
+ private static Module buildFederationModule(Injector settingsInjector)
throws ConfigurationException {
- Module federationModule;
- if (enableFederation) {
- federationModule = settingsInjector.getInstance(XmppFederationModule.class);
- } else {
- federationModule = settingsInjector.getInstance(NoOpFederationModule.class);
- }
- return federationModule;
+ return settingsInjector.getInstance(NoOpFederationModule.class);
}
private static void initializeServer(Injector injector, String waveDomain)
diff --git a/wave/src/main/java/org/waveprotocol/box/server/executor/ExecutorAnnotations.java b/wave/src/main/java/org/waveprotocol/box/server/executor/ExecutorAnnotations.java
index a002c76..5621610 100644
--- a/wave/src/main/java/org/waveprotocol/box/server/executor/ExecutorAnnotations.java
+++ b/wave/src/main/java/org/waveprotocol/box/server/executor/ExecutorAnnotations.java
@@ -80,11 +80,6 @@
public @interface RobotGatewayExecutor {
}
- @Retention(RUNTIME)
- @BindingAnnotation
- public @interface XmppExecutor {
- }
-
@BindingAnnotation
public @interface SolrExecutor {
}
diff --git a/wave/src/main/java/org/waveprotocol/box/server/executor/ExecutorsModule.java b/wave/src/main/java/org/waveprotocol/box/server/executor/ExecutorsModule.java
index bfcd345..2d8e65e 100644
--- a/wave/src/main/java/org/waveprotocol/box/server/executor/ExecutorsModule.java
+++ b/wave/src/main/java/org/waveprotocol/box/server/executor/ExecutorsModule.java
@@ -137,15 +137,6 @@
@Provides
@Singleton
- @XmppExecutor
- protected ScheduledExecutorService provideXmppExecutor(
- Provider<ScheduledRequestScopeExecutor> executorProvider) {
- return provideScheduledThreadPoolExecutor(executorProvider, 1, XmppExecutor.class
- .getSimpleName());
- }
-
- @Provides
- @Singleton
@SolrExecutor
protected Executor provideSolrExecutor(Provider<RequestScopeExecutor> executorProvider,
Config config) {
diff --git a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/Base64Util.java b/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/Base64Util.java
deleted file mode 100644
index 6ab6569..0000000
--- a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/Base64Util.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.waveprotocol.wave.federation.xmpp;
-
-import com.google.protobuf.AbstractMessageLite;
-import com.google.protobuf.ByteString;
-
-import org.apache.commons.codec.binary.Base64;
-
-import java.nio.charset.Charset;
-
-/**
- * Utility class for encoding and decoding ByteStrings, byte arrays and encoding
- * generic protocol buffers.
- *
- * @author arb@google.com (Anthony Baxter)
- * @author thorogood@google.com (Sam Thorogood)
- */
-public final class Base64Util {
-
- // Character set for all encoding and decoding. Base64 can be correctly
- // represented using UTF-8.
- private static final Charset CHAR_SET = Charset.forName("UTF-8");
-
- /**
- * Utility class only, cannot be instantiated.
- */
- private Base64Util() {
- }
-
- public static String encode(ByteString bs) {
- return new String(Base64.encodeBase64(bs.toByteArray()), CHAR_SET);
- }
-
- public static String encode(byte[] ba) {
- return new String(Base64.encodeBase64(ba), CHAR_SET);
- }
-
- public static String encode(AbstractMessageLite message) {
- return new String(Base64.encodeBase64(message.toByteArray()), CHAR_SET);
- }
-
- public static byte[] decodeFromArray(String str) {
- return Base64.decodeBase64(str.getBytes(CHAR_SET));
- }
-
- public static ByteString decode(String str) {
- return ByteString.copyFrom(Base64.decodeBase64(str.getBytes(CHAR_SET)));
- }
-}
diff --git a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/ComponentPacketTransport.java b/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/ComponentPacketTransport.java
deleted file mode 100644
index a98dbbc..0000000
--- a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/ComponentPacketTransport.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.waveprotocol.wave.federation.xmpp;
-
-import com.google.inject.Inject;
-import com.typesafe.config.Config;
-import org.jivesoftware.whack.ExternalComponentManager;
-import org.xmpp.component.Component;
-import org.xmpp.component.ComponentException;
-import org.xmpp.component.ComponentManager;
-import org.xmpp.packet.JID;
-import org.xmpp.packet.Packet;
-
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.logging.Logger;
-
-/**
- * Talks to a XMPP server using the Jabber Component Protocol (XEP-0114).
- *
- * Implements {@link OutgoingPacketTransport} allowing users to send packets,
- * and accepts an {@link IncomingPacketHandler} which can process incoming
- * packets.
- *
- * @author thorogood@google.com (Sam Thorogood)
- */
-public class ComponentPacketTransport implements Component, OutgoingPacketTransport {
- private static final Logger LOG =
- Logger.getLogger(ComponentPacketTransport.class.getCanonicalName());
-
- private final IncomingPacketHandler handler;
- private final String componentName;
- private final String serverDomain;
- private final String serverSecret;
- private final String serverAddress;
- private final int serverPort;
-
- // Contains packets queued but not sent (while offline).
- private final Queue<Packet> queuedPackets;
-
- // Object used to lock around online/offline state changes.
- private final Object connectionLock = new Object();
-
- private ExternalComponentManager componentManager = null;
- private boolean connected = false;
-
- @Inject
- public ComponentPacketTransport(IncomingPacketHandler handler, Config config) {
- this.handler = handler;
- this.componentName = config.getString("federation.xmpp_component_name");
- this.serverDomain = config.getString("federation.xmpp_server_hostname");
- this.serverSecret = config.getString("federation.xmpp_server_secret");
- this.serverAddress = config.getString("federation.xmpp_server_ip");
- this.serverPort = config.getInt("federation.xmpp_server_component_port");
-
- queuedPackets = new LinkedList<>();
- }
-
- /**
- * Bind the component to the XMPP server.
- *
- * @throws ComponentException if the component couldn't talk to the server
- */
- public void run() throws ComponentException {
- componentManager = new ExternalComponentManager(serverAddress, serverPort);
- componentManager.setDefaultSecretKey(serverSecret);
- componentManager.setServerName(serverDomain);
-
- // Register this component with the manager.
- componentManager.addComponent(componentName, this);
- }
-
- @Override
- public void sendPacket(Packet packet) {
- synchronized (connectionLock) {
- if (connected) {
- componentManager.sendPacket(this, packet);
- } else {
- queuedPackets.add(packet);
- }
- }
- }
-
- @Override
- public String getDescription() {
- return "Wave in a Box Server";
- }
-
- @Override
- public String getName() {
- return componentName;
- }
-
- @Override
- public void initialize(JID jid, ComponentManager componentManager) {
- // TODO(thorogood): According to XEP-0114, the only valid JID here is the
- // same JID we attempt to connect to the XMPP server with.
- LOG.info("Initializing with JID: " + jid);
- }
-
- /**
- * {@inheritDoc}
- *
- * Pass the incoming on-the-wire packet onto the incoming handler.
- */
- @Override
- public void processPacket(Packet packet) {
- handler.receivePacket(packet);
- }
-
- @Override
- public void shutdown() {
- synchronized (connectionLock) {
- LOG.info("Disconnected from XMPP server.");
- componentManager = null;
- connected = false;
- }
- }
-
- @Override
- public void start() {
- synchronized (connectionLock) {
- connected = true;
- LOG.info("Connected to XMPP server with JID: " + componentName + "." + serverDomain);
-
- // Send all queued outgoing packets.
- while (!queuedPackets.isEmpty()) {
- componentManager.sendPacket(this, queuedPackets.poll());
- }
- }
- }
-}
diff --git a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/IncomingPacketHandler.java b/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/IncomingPacketHandler.java
deleted file mode 100644
index cb60ac0..0000000
--- a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/IncomingPacketHandler.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.waveprotocol.wave.federation.xmpp;
-
-import org.xmpp.packet.Packet;
-
-/**
- * Generic incoming XMPP packet handler interface. This should only be
- * implemented by {@link XmppManager}, regardless of which wire transport is in
- * use.
- *
- * @author thorogood@google.com (Sam Thorogood)
- */
-public interface IncomingPacketHandler {
-
- /**
- * Accept a generic XMPP packet from on-the-wire.
- */
- void receivePacket(Packet packet);
-
-}
diff --git a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/OutgoingPacketTransport.java b/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/OutgoingPacketTransport.java
deleted file mode 100644
index 4899df0..0000000
--- a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/OutgoingPacketTransport.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.waveprotocol.wave.federation.xmpp;
-
-import org.xmpp.packet.Packet;
-
-/**
- * Generic outgoing XMPP packet transport interface. Should be implemented by
- * the handling XMPP transport (e.g. component system).
- *
- * @author thorogood@google.com (Sam Thorogood)
- */
-public interface OutgoingPacketTransport {
-
- /**
- * Send a packet over-the-wire to its prescribed destination address. Provides
- * no guarantees of delivery or callback.
- */
- void sendPacket(Packet packet);
-
-}
diff --git a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/PacketCallback.java b/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/PacketCallback.java
deleted file mode 100644
index e86a091..0000000
--- a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/PacketCallback.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.waveprotocol.wave.federation.xmpp;
-
-import org.xmpp.packet.Packet;
-import org.waveprotocol.wave.federation.FederationErrorProto.FederationError;
-
-/**
- * Simple callback type used for sending and receiving reliable XMPP packet
- * messages. This allows for clearly defined success and failure states.
- *
- * @author thorogood@google.com (Sam Thorogood)
- */
-public interface PacketCallback {
- void run(Packet packet);
- void error(FederationError error);
-}
diff --git a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/RemoteDisco.java b/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/RemoteDisco.java
deleted file mode 100644
index b6d5868..0000000
--- a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/RemoteDisco.java
+++ /dev/null
@@ -1,430 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.waveprotocol.wave.federation.xmpp;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.collect.MapMaker;
-import com.google.common.collect.Sets;
-
-import org.dom4j.Attribute;
-import org.dom4j.Element;
-import org.joda.time.DateTimeUtils;
-import org.waveprotocol.wave.federation.FederationErrors;
-import org.waveprotocol.wave.federation.FederationErrorProto.FederationError;
-import org.xmpp.packet.IQ;
-import org.xmpp.packet.Packet;
-
-import java.security.SecureRandom;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.annotation.Nullable;
-
-/**
- * Represents XMPP disco status for a specific remote domain. This class only
- * exposes one public method; {@link #discoverRemoteJID}.
- *
- * @author thorogood@google.com (Sam Thorogood)
- */
-public class RemoteDisco {
- private static final Logger LOG = Logger.getLogger(RemoteDisco.class.getCanonicalName());
-
- static final int MAXIMUM_DISCO_ATTEMPTS = 5;
- static final int MINIMUM_REXMIT_MS = 15000;
- static final int REXMIT_JITTER_MS = 2000;
- static final int DISCO_INFO_TIMEOUT = 20;
-
- private final long creationTimeMillis;
- private final long failExpirySecs;
- private final long successExpirySecs;
-
- enum Status {
- INIT, PENDING, COMPLETE
- }
-
- private final Random random = new SecureRandom();
- private final XmppManager manager;
- private final String remoteDomain;
- private final AtomicReference<Status> status;
- private final Queue<SuccessFailCallback<String, String>> pending;
-
- // Result JID field that will be available on COMPLETE status.
- private String remoteJid;
-
- // Error field that will be available on COMPLETE status.
- private FederationError error;
-
-
- // These two values are used for tracking success and failure counts.
- // Not yet exposed in the fedone waveserver.
- public static final LoadingCache<String, AtomicLong> statDiscoSuccess =
- CacheBuilder.newBuilder().build(new CacheLoader<String, AtomicLong>() {
- @Override
- public AtomicLong load(String domain) {
- return new AtomicLong();
- }
- });
-
- public static final LoadingCache<String, AtomicLong> statDiscoFailed =
- CacheBuilder.newBuilder().build(new CacheLoader<String, AtomicLong>() {
- @Override
- public AtomicLong load(String domain) {
- return new AtomicLong();
- }
- });
-
- /**
- * Construct a new RemoteDisco targeting the given domain. This will not kick
- * off the disco request itself.
- * @param manager XmppManager object, used to send packets
- * @param remoteDomain the name of the remote domain (not JID)
- * @param failExpirySecs how long to keep alive a failed disco result
- * @param successExpirySecs how long to keep alive a successful disco result
- */
- public RemoteDisco(XmppManager manager, String remoteDomain, long failExpirySecs,
- long successExpirySecs) {
- this.manager = manager;
- status = new AtomicReference<Status>(Status.INIT);
- pending = new ConcurrentLinkedQueue<SuccessFailCallback<String, String>>();
- this.remoteDomain = remoteDomain;
- this.creationTimeMillis = DateTimeUtils.currentTimeMillis();
- this.failExpirySecs = failExpirySecs;
- this.successExpirySecs = successExpirySecs;
- }
-
- /**
- * Construct a new RemoteDisco - purely for testing - with an already
- * determined result. Either jid or error must be passed.
- *
- * @param remoteDomain the name of the remote domain (not JID)
- * @param jid the domain's remote JID
- * @param error the error from disco
- */
- @VisibleForTesting
- RemoteDisco(String remoteDomain, String jid, FederationError error) {
- Preconditions.checkArgument((jid != null)^(error != null));
-
- manager = null;
- status = new AtomicReference<Status>(Status.COMPLETE);
- pending = null;
- this.remoteDomain = remoteDomain;
- this.remoteJid = jid;
- this.error = error;
- // defaults for testing
- this.creationTimeMillis = DateTimeUtils.currentTimeMillis();
- this.failExpirySecs = 2 * 60;
- this.successExpirySecs = 2 * 60 * 60;
- }
-
- /**
- * Check whether the request is currently PENDING. Visible only for tests.
- * @return true if pending else false
- */
- @VisibleForTesting
- boolean isRequestPending() {
- return status.get().equals(Status.PENDING);
- }
-
- /**
- * Attempt to discover the remote JID for this domain. If the JID has already
- * been discovered, then this method will invoke the callback immediately.
- * Otherwise, the callback is guaranteed to be invoked at a later point.
- *
- * @param callback a callback to be invoked when disco is complete
- */
- public void discoverRemoteJID(SuccessFailCallback<String, String> callback) {
- if (status.get().equals(Status.COMPLETE)) {
- complete(callback);
- } else if (status.compareAndSet(Status.INIT, Status.PENDING)) {
- pending.add(callback);
- startDisco();
- } else {
- pending.add(callback);
-
- // If we've become complete since the start of this method, complete
- // all possible callbacks.
- if (status.get().equals(Status.COMPLETE)) {
- SuccessFailCallback<String, String> item;
- while ((item = pending.poll()) != null) {
- complete(item);
- }
- }
- }
- }
-
- /**
- * Returns true if this RemoteDisco's time to live is exceeded.
- *
- * We can't use MapMaker's expiration code as it won't let us have different expiry for
- * successful and failed cases.
- *
- * @return whether this object should be deleted and recreated
- */
- public boolean ttlExceeded() {
- if (status.get() == Status.COMPLETE) {
- if (remoteJid == null) {
- // Failed disco case
- if (DateTimeUtils.currentTimeMillis() >
- (creationTimeMillis + (1000 * failExpirySecs))) {
- return true;
- }
- } else {
- // Successful disco case
- if (DateTimeUtils.currentTimeMillis() >
- (creationTimeMillis + (1000 * successExpirySecs))) {
- return true;
- }
- }
- }
- return false;
- }
-
- /**
- * Complete any specific callback (in the current thread). Requires the status
- * to be COMPLETE.
- *
- * TODO(thorogood): thread model for completing callbacks
- * @param callback the callback to complete
- */
- private void complete(SuccessFailCallback<String, String> callback) {
- Preconditions.checkState(status.get().equals(Status.COMPLETE));
- if (remoteJid != null) {
- callback.onSuccess(remoteJid);
- } else {
- // TODO(thorogood): better toString, or change failure type to FederationError
- callback.onFailure(error.toString());
- }
- }
-
- /**
- * Start XMPP discovery. Kicks off a retrying call to dial-up the remote
- * server and discover its available disco items.
- *
- * This should only be called by a method holding the PENDING state.
- */
- private void startDisco() {
- final IQ request = manager.createRequestIQ(remoteDomain);
- request.setChildElement("query", XmppNamespace.NAMESPACE_DISCO_ITEMS);
-
- final Runnable requester = new Runnable() {
- int attempt = 0;
-
- final PacketCallback callback = new PacketCallback() {
- @Override
- public void run(Packet result) {
- Preconditions.checkArgument(result instanceof IQ, "Manager must provide response IQ");
- processDiscoItemsResult((IQ) result);
- }
-
- @Override
- public void error(FederationError error) {
- if (error.getErrorCode().equals(FederationError.Code.REMOTE_SERVER_TIMEOUT)) {
- retry();
- } else {
- LOG.info("Remote server " + remoteDomain + " failed on disco items: "
- + error.getErrorCode());
- processDiscoItemsResult(null);
- }
- }
- };
-
- void retry() {
- attempt += 1;
- if (attempt > MAXIMUM_DISCO_ATTEMPTS) {
- finish(null, FederationErrors
- .newFederationError(FederationError.Code.REMOTE_SERVER_TIMEOUT));
- } else {
- // TODO(thorogood): fix ms/seconds!
- int timeout = nextDiscoRetransmitTimeout(attempt) / 1000;
- request.setID(XmppUtil.generateUniqueId());
- LOG.info("Sending disco items request for: " + remoteDomain + ", timeout " + timeout
- + " seconds");
- manager.send(request, callback, timeout);
- }
- }
-
- @Override
- public void run() {
- retry();
- }
- };
-
- // Kick off requester!
- requester.run();
- }
-
- /**
- * Calculate the requested timeout for any given request number. Introduces
- * random jitter.
- *
- * @param attempt the attempt count
- * @return request timeout in ms
- */
- private int nextDiscoRetransmitTimeout(int attempt) {
- Preconditions.checkArgument(attempt > 0);
- return MINIMUM_REXMIT_MS * (1 << (attempt - 1)) + random.nextInt(REXMIT_JITTER_MS);
- }
-
- /**
- * Process a returned set of disco items. Invoke a query for each item in
- * parallel, searching for any item which supports Wave.
- *
- * @param result IQ stanza provided from disco items, if null try default items
- */
- private void processDiscoItemsResult(@Nullable IQ result) {
- Set<String> candidates = Sets.newHashSet();
-
- // Traverse the source list, finding possible JID candidates.
- if (result != null) {
- List<Element> items = XmppUtil.toSafeElementList(result.getChildElement().elements("item"));
- for (Element item : items) {
- Attribute jid = item.attribute("jid");
- if (jid != null) {
- candidates.add(jid.getValue());
- }
- }
- }
-
- // Returned nothing for the items list. Try the domain itself.
- if (candidates.isEmpty()) {
- candidates.add(remoteDomain);
- }
-
- // Always query 'wave.', as an automatic fallback.
- candidates.add("wave." + remoteDomain);
-
- // Iterate over all candidates, requesting information in parallel.
- AtomicInteger sharedLatch = new AtomicInteger(candidates.size());
- for (String candidate : candidates) {
- requestDiscoInfo(candidate, sharedLatch);
- }
- }
-
- /**
- * Request disco info from a specific target JID. Accepts a target JID as well
- * as a shared latch: on a result, the latch should be decremented and if it
- * reaches zero, finish() must be invoked with an error.
- *
- * @param target the target JID
- * @param sharedLatch a shared latch
- */
- private void requestDiscoInfo(String target, final AtomicInteger sharedLatch) {
- final IQ request = manager.createRequestIQ(target);
- request.setChildElement("query", XmppNamespace.NAMESPACE_DISCO_INFO);
-
- PacketCallback callback = new PacketCallback() {
- @Override
- public void error(FederationError error) {
- int currentCount = sharedLatch.decrementAndGet();
- Preconditions.checkState(currentCount >= 0,
- "Info latch should not count down past zero for domain: %s", remoteDomain);
- if (currentCount == 0) {
- finish(null, error);
- }
- }
-
- @Override
- public void run(Packet packet) {
- Preconditions.checkArgument(packet instanceof IQ);
- IQ result = (IQ) packet;
-
- List<Element> features =
- XmppUtil.toSafeElementList(result.getChildElement().elements("feature"));
- for (Element feature : features) {
- Attribute var = feature.attribute("var");
- if (var != null && var.getValue().equals(XmppNamespace.NAMESPACE_WAVE_SERVER)) {
- String targetJID = packet.getFrom().toString();
- finish(targetJID, null);
-
- // Decrement the latch *after* finishing, so we don't allow an error
- // callback to be kicked off.
- Preconditions.checkState(sharedLatch.decrementAndGet() >= 0,
- "Info latch should not count down past zero for domain: %s", remoteDomain);
- return;
- }
- }
-
- // This result didn't contain a useful result JID, so cause an error.
- error(FederationErrors.newFederationError(FederationError.Code.ITEM_NOT_FOUND));
- }
- };
-
- LOG.info("Sending disco info request for: " + target);
- manager.send(request, callback, DISCO_INFO_TIMEOUT);
- }
-
- /**
- * Finish this disco attempt with either a success or error result. This
- * method should only be called on a thread that owns the PENDING state and
- * will (if successful) result in a transition to COMPLETE. If the disco
- * attempt is already complete, return false and do nothing (safe operation).
- *
- * @param jid success JID, or null
- * @param error error proto, or null
- * @return true if successful, false if already finished
- */
- @VisibleForTesting
- boolean finish(String jid, FederationError error) {
- Preconditions.checkArgument((jid != null)^(error != null));
- if (!status.compareAndSet(Status.PENDING, Status.COMPLETE)) {
- return false;
- }
-
- // Set either the result JID or error state.
-
- try {
- if (jid != null) {
- this.remoteJid = jid;
- LOG.info("Discovered remote JID: " + jid + " for " + remoteDomain);
- statDiscoSuccess.get(remoteDomain).incrementAndGet();
- } else if (error != null) {
- this.error = error;
- LOG.info("Could not discover remote JID: " + error + " for " + remoteDomain);
- statDiscoFailed.get(remoteDomain).incrementAndGet();
- } else {
- throw new IllegalArgumentException("At least one of jid/error must be set");
- }
- } catch (ExecutionException ex) {
- throw new RuntimeException(ex);
- }
-
- // Complete all available callbacks.
- SuccessFailCallback<String, String> item;
- while ((item = pending.poll()) != null) {
- complete(item);
- }
- return true;
- }
-}
diff --git a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/SuccessFailCallback.java b/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/SuccessFailCallback.java
deleted file mode 100644
index d88a141..0000000
--- a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/SuccessFailCallback.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.waveprotocol.wave.federation.xmpp;
-
-/**
- * A generic onSuccess/onFailure callback interface.
- *
- * @author kalman@google.com (Ben Kalman)
- */
-public interface SuccessFailCallback<SuccessValue, FailureValue> {
- void onSuccess(SuccessValue response);
- void onFailure(FailureValue reason);
-}
diff --git a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppDisco.java b/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppDisco.java
deleted file mode 100644
index 3e03ef7..0000000
--- a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppDisco.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.waveprotocol.wave.federation.xmpp;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.inject.Inject;
-import com.typesafe.config.Config;
-import org.dom4j.Element;
-import org.waveprotocol.wave.federation.FederationErrorProto.FederationError;
-import org.waveprotocol.wave.federation.FederationErrors;
-import org.xmpp.packet.IQ;
-
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Implementation of XMPP Discovery. Provides public methods to respond to incoming disco requests
- * (via {@link XmppManager}), as well as outgoing disco via {{@link #discoverRemoteJid}.
- *
- * @author arb@google.com (Anthony Baxter)
- * @author thorogood@google.com (Sam Thorogood)
- */
-public class XmppDisco {
-
- @SuppressWarnings("unused")
- private static final Logger LOG = Logger.getLogger(XmppDisco.class.getCanonicalName());
-
- // This tracks the number of disco attempts started.
- public static final LoadingCache<String, AtomicLong> statDiscoStarted =
- CacheBuilder.newBuilder().build(new CacheLoader<String, AtomicLong>() {
- @Override
- public AtomicLong load(@SuppressWarnings("NullableProblems") String domain) {
- return new AtomicLong();
- }
- });
-
- private final LoadingCache<String, RemoteDisco> discoRequests;
- private final String serverDescription;
-
- private XmppManager manager = null;
- // Accessed by XmppFederationHostForDomain.
- final long failExpirySecs;
- final long successExpirySecs;
- final long discoExpirationHours;
- final String discoInfoCategory;
- final String discoInfoType;
-
- /**
- * Constructor. Note that {@link #setManager} must be called before this class is ready to use.
- */
- @Inject
- public XmppDisco(Config config) {
- this.serverDescription = config.getString("federation.xmpp_server_description");
- this.discoInfoCategory = config.getString("federation.disco_info_category");
- this.discoInfoType = config.getString("federation.disco_info_type");
- this.failExpirySecs = config.getDuration("federation.xmpp_disco_failed_expiry", TimeUnit.SECONDS);
- this.successExpirySecs = config.getDuration("federation.xmpp_disco_successful_expiry", TimeUnit.SECONDS);
- this.discoExpirationHours = config.getDuration("federation.disco_expiration", TimeUnit.HOURS);
-
- //noinspection NullableProblems
- discoRequests =
- CacheBuilder.newBuilder().expireAfterWrite(
- discoExpirationHours, TimeUnit.HOURS).build(
- new CacheLoader<String, RemoteDisco>() {
-
- @Override
- public RemoteDisco load(String domain) throws Exception {
- statDiscoStarted.get(domain).incrementAndGet();
- return new RemoteDisco(manager, domain, failExpirySecs, successExpirySecs);
- }
- });
- }
-
- /**
- * Set the manager instance for this class. Must be invoked before any other
- * methods are used.
- * @param manager an XmppManager instance
- */
- public void setManager(XmppManager manager) {
- this.manager = manager;
- }
-
- /**
- * Handles a disco info get from a foreign source. A remote server is trying to ask us what we
- * support. Send back a message identifying as a wave component.
- *
- * @param iq the IQ packet.
- * @param responseCallback callback used to send response
- */
- void processDiscoInfoGet(IQ iq, PacketCallback responseCallback) {
- IQ response = IQ.createResultIQ(iq);
- Element query = response.setChildElement("query", XmppNamespace.NAMESPACE_DISCO_INFO);
-
- query.addElement("identity")
- .addAttribute("category", discoInfoCategory)
- .addAttribute("type", discoInfoType)
- .addAttribute("name", serverDescription);
-
- query.addElement("feature")
- .addAttribute("var", XmppNamespace.NAMESPACE_WAVE_SERVER);
-
- responseCallback.run(response);
- }
-
-
- /**
- * Handles a disco items get from a foreign XMPP agent. No useful responses, since we're not a
- * domain on it's own: just the wave component.
- *
- * @param iq the IQ packet.
- * @param responseCallback callback used to send response
- */
- void processDiscoItemsGet(IQ iq, PacketCallback responseCallback) {
- IQ response = IQ.createResultIQ(iq);
- response.setChildElement("query", XmppNamespace.NAMESPACE_DISCO_ITEMS);
- responseCallback.run(response);
- }
-
- /**
- * Attempt to discover the remote JID for this domain. Hands control to {@link RemoteDisco}.
- *
- * @param remoteDomain the domain to discover
- * @param callback a callback to trigger when disco completes
- */
- public void discoverRemoteJid(String remoteDomain, SuccessFailCallback<String, String> callback) {
- Preconditions.checkNotNull("Must call setManager first", manager);
- RemoteDisco disco = discoRequests.getIfPresent(remoteDomain);
- if (disco != null) {
- // This is a race condition, but we don't care if we lose it, because the ttl timestamp
- // won't be exceeded in that case.
- if (disco.ttlExceeded()) {
- if (LOG.isLoggable(Level.FINE)) {
- LOG.info("discoverRemoteJid for " + remoteDomain + ": result ttl exceeded.");
- }
- // TODO(arb): should we expose the disco cache somehow for debugging?
- discoRequests.invalidate(remoteDomain);
- }
- }
- try {
- discoRequests.get(remoteDomain).discoverRemoteJID(callback);
- } catch (ExecutionException ex) {
- throw new RuntimeException(ex);
- }
- }
-
- /**
- * Inject a predetermined result into the disco results map. If the passed jid is null, generate
- * an error/not-found case.
- *
- * @param domain remote domain
- * @param jid remote JID
- * @throws IllegalStateException if there is already a result for this domain
- */
- @VisibleForTesting
- void testInjectInDomainToJidMap(String domain, String jid) {
- FederationError error = null;
- if (jid == null) {
- error = FederationErrors.badRequest("Fake injected error");
- }
- RemoteDisco disco = discoRequests.getIfPresent(domain);
- Preconditions.checkState(disco == null);
- discoRequests.put(domain, new RemoteDisco(domain, jid, error));
- }
-
- /**
- * Determine whether a request for the given domain is pending.
- *
- * @param domain remote domain
- * @return true/false
- */
- @VisibleForTesting
- boolean isDiscoRequestPending(String domain) throws ExecutionException {
- RemoteDisco disco = discoRequests.getIfPresent(domain);
- return disco != null && disco.isRequestPending();
- }
-
- /**
- * Determine whether the disco request for the given domain has been touched or is at all
- * available.
- *
- * @param domain remote domain
- * @return true/false
- */
- @VisibleForTesting
- boolean isDiscoRequestAvailable(String domain) {
- return discoRequests.getIfPresent(domain) != null;
- }
-}
diff --git a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppFederationHost.java b/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppFederationHost.java
deleted file mode 100644
index 09c76d1..0000000
--- a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppFederationHost.java
+++ /dev/null
@@ -1,446 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.waveprotocol.wave.federation.xmpp;
-
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.inject.Inject;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.typesafe.config.Config;
-import org.dom4j.Element;
-import org.waveprotocol.wave.federation.FederationErrorProto.FederationError;
-import org.waveprotocol.wave.federation.FederationErrors;
-import org.waveprotocol.wave.federation.FederationHostBridge;
-import org.waveprotocol.wave.federation.Proto.ProtocolHashedVersion;
-import org.waveprotocol.wave.federation.Proto.ProtocolSignedDelta;
-import org.waveprotocol.wave.federation.Proto.ProtocolSignerInfo;
-import org.waveprotocol.wave.federation.WaveletFederationListener;
-import org.waveprotocol.wave.federation.WaveletFederationProvider;
-import org.waveprotocol.wave.federation.xmpp.XmppUtil.UnknownSignerType;
-import org.waveprotocol.wave.model.id.URIEncoderDecoder.EncodingException;
-import org.waveprotocol.wave.model.id.WaveletName;
-import org.xmpp.packet.IQ;
-
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.logging.Logger;
-
-/**
- * This class encapsulates the incoming packet processing portion of the
- * Federation Host. Messages arrive on this class from a foreign Federation
- * Remote for wavelets hosted by the local wave server.
- */
-public class XmppFederationHost implements WaveletFederationListener.Factory {
- @SuppressWarnings("unused")
- private static final Logger LOG = Logger.getLogger(XmppFederationHost.class.getCanonicalName());
-
- private final WaveletFederationProvider waveletProvider;
-
- private XmppManager manager = null;
-
- // A map of update listeners. There is one per remote domain we are sending updates to.
- // The name 'listener' refers to them listening for updates from the waveserver to send to the
- // network.
- private final LoadingCache<String, WaveletFederationListener> listeners;
-
- /**
- * Constructor. Note that {@link #setManager} must be called before this class
- * is ready to use.
- *
- * @param waveletProvider used for communicating back to the Host part of the
- * wavelet server.
- * @param disco used for discovery
- */
- @Inject
- public XmppFederationHost(@FederationHostBridge WaveletFederationProvider waveletProvider,
- final XmppDisco disco, final Config config) {
- this.waveletProvider = waveletProvider;
- listeners = CacheBuilder.newBuilder().build(new CacheLoader<String, WaveletFederationListener>() {
- @Override
- public WaveletFederationListener load(@SuppressWarnings("NullableProblems") String domain) {
- return new XmppFederationHostForDomain(domain, manager, disco, config);
- }
- });
- }
-
- /**
- * Set the manager instance for this class. Must be invoked before any other
- * methods are used.
- * @param manager the XmppManager object, used to send packets.
- */
- public void setManager(XmppManager manager) {
- this.manager = manager;
- }
-
- /**
- * Parse to a ProtocolHashedVersion from a given string version/base64-hash combination.
- *
- * @param startVersion the starting version
- * @param base64Hash the base64 hash
- * @throws IllegalArgumentException on bad data
- * @return a parsed protobuf object
- */
- private static ProtocolHashedVersion parseFromUnsafe(String startVersion, String base64Hash)
- throws IllegalArgumentException {
- return ProtocolHashedVersion.newBuilder()
- .setVersion(Long.parseLong(startVersion))
- .setHistoryHash(Base64Util.decode(base64Hash)).build();
- }
-
- /**
- * Reads a history request off the wire, sends it to the WS with a new
- * callback for returning the response.
- * @param request the history request
- * @param responseCallback the callback to send the response back
- */
- void processHistoryRequest(final IQ request, final PacketCallback responseCallback) {
- Element items = null, historyDelta = null;
- Element pubsubRequest = request.getElement().element("pubsub");
- if (pubsubRequest != null) {
- items = pubsubRequest.element("items");
- if (items != null) {
- historyDelta = items.element("delta-history");
- }
- }
- if (items == null || historyDelta == null
- || historyDelta.attribute("start-version") == null
- || historyDelta.attribute("start-version-hash") == null
- || historyDelta.attribute("end-version") == null
- || historyDelta.attribute("end-version-hash") == null
- || historyDelta.attribute("wavelet-name") == null) {
- responseCallback.error(FederationErrors.badRequest("Malformed history request"));
- return;
- }
-
- final ProtocolHashedVersion startVersion;
- try {
- startVersion = parseFromUnsafe(historyDelta.attributeValue("start-version"),
- historyDelta.attributeValue("start-version-hash"));
- } catch (IllegalArgumentException e) {
- responseCallback.error(FederationErrors.badRequest("Invalid format of start version"));
- return;
- }
-
- final ProtocolHashedVersion endVersion;
- try {
- endVersion = parseFromUnsafe(historyDelta.attributeValue("end-version"),
- historyDelta.attributeValue("end-version-hash"));
- } catch (IllegalArgumentException e) {
- responseCallback.error(FederationErrors.badRequest("Invalid format of end version"));
- return;
- }
-
- final long responseLengthLimit;
- if (historyDelta.attribute("response-length-limit") != null) {
- try {
- responseLengthLimit = Long.parseLong(historyDelta.attributeValue("response-length-limit"));
- } catch (NumberFormatException e) {
- responseCallback.error(FederationErrors.badRequest("Invalid response length limit"));
- return;
- }
- } else {
- responseLengthLimit = 0;
- }
-
- final WaveletName waveletName;
- try {
- waveletName =
- XmppUtil.waveletNameCodec.uriToWaveletName(historyDelta.attributeValue("wavelet-name"));
- } catch (EncodingException e) {
- responseCallback.error(FederationErrors.badRequest(
- "Malformed wavelet name: " + historyDelta.attributeValue("wavelet-name")));
- return;
- }
-
- // Construct a new response listener inline.
- WaveletFederationProvider.HistoryResponseListener listener =
- new WaveletFederationProvider.HistoryResponseListener() {
- @Override
- public void onFailure(FederationError error) {
- responseCallback.error(error);
- }
-
- @Override
- public void onSuccess(List<ByteString> appliedDeltaSet,
- ProtocolHashedVersion lastCommittedVersion, long versionTruncatedAt) {
- IQ response = IQ.createResultIQ(request);
-
- Element pubsub = response.setChildElement("pubsub", XmppNamespace.NAMESPACE_PUBSUB);
- Element items = pubsub.addElement("items");
-
- // Add each delta to the outgoing response.
- for (ByteString appliedDelta : appliedDeltaSet) {
- items.addElement("item").addElement("applied-delta",
- XmppNamespace.NAMESPACE_WAVE_SERVER).addCDATA(
- Base64Util.encode(appliedDelta.toByteArray()));
- }
-
- // Set the LCV history-hash, if provided.
- // TODO(thorogood): We don't set the hashed version, which is wrong,
- // but it's not part of the current spec (Feb 2010).
- if (lastCommittedVersion != null && lastCommittedVersion.hasVersion()) {
- String version = String.valueOf(lastCommittedVersion.getVersion());
- items.addElement("item").addElement("commit-notice",
- XmppNamespace.NAMESPACE_WAVE_SERVER).addAttribute("version", version);
- }
-
- // Set the version truncated at, if provided.
- if (versionTruncatedAt > 0) {
- String version = String.valueOf(versionTruncatedAt);
- items.addElement("item").addElement("history-truncated",
- XmppNamespace.NAMESPACE_WAVE_SERVER).addAttribute("version", version);
- }
-
- // Send the message to the source.
- responseCallback.run(response);
- }
- };
-
- // Hand off a history request to the waveletProvider.
- // TODO(thorogood,arb): Note that the following remote domain is going to be
- // the Wave component JID (e.g. wave.foo.com), and *not* the actual remote domain.
- String remoteDomain = request.getFrom().getDomain();
- waveletProvider.requestHistory(waveletName, remoteDomain, startVersion,
- endVersion, responseLengthLimit, listener);
- }
-
- /**
- * Handles a submit request from a foreign wave remote. Sends it to the wave
- * server, sets up a callback to send the response.
- * @param request the submit request
- * @param responseCallback the callback to send the response back
- */
- void processSubmitRequest(final IQ request, final PacketCallback responseCallback) {
- Element item = null, submitRequest = null, deltaElement = null;
- Element pubsubRequest = request.getElement().element("pubsub");
- // TODO: check for correct elements.
- Element publish = pubsubRequest.element("publish");
- if (publish != null) {
- item = publish.element("item");
- if (item != null) {
- submitRequest = item.element("submit-request");
- if (submitRequest != null) {
- deltaElement = submitRequest.element("delta");
- }
- }
- }
- if (publish == null || item == null || submitRequest == null
- || deltaElement == null
- || deltaElement.attribute("wavelet-name") == null
- || deltaElement.getText() == null) {
- responseCallback.error(FederationErrors.badRequest("Malformed submit request"));
- return;
- }
-
- final WaveletName waveletName;
- try {
- waveletName =
- XmppUtil.waveletNameCodec.uriToWaveletName(deltaElement.attributeValue("wavelet-name"));
- } catch (EncodingException e) {
- responseCallback.error(FederationErrors.badRequest(
- "Malformed wavelet name: " + deltaElement.attributeValue("wavelet-name")));
- return;
- }
-
- final ProtocolSignedDelta delta;
- try {
- delta = ProtocolSignedDelta.parseFrom(Base64Util.decode(deltaElement.getText()));
- } catch (InvalidProtocolBufferException e) {
- responseCallback.error(FederationErrors.badRequest(
- "Malformed delta, not a valid protocol buffer"));
- return;
- }
-
- // Construct a submit result listener inline.
- WaveletFederationProvider.SubmitResultListener listener =
- new WaveletFederationProvider.SubmitResultListener() {
- @Override
- public void onFailure(FederationError error) {
- responseCallback.error(error);
- }
-
- @Override
- public void onSuccess(int operations, ProtocolHashedVersion version, long timestamp) {
- IQ response = IQ.createResultIQ(request);
-
- Element pubsub = response.setChildElement("pubsub", XmppNamespace.NAMESPACE_PUBSUB);
- Element submitResponse = pubsub.addElement("publish").addElement("item")
- .addElement("submit-response", XmppNamespace.NAMESPACE_WAVE_SERVER);
-
- submitResponse.addAttribute("application-timestamp", String.valueOf(timestamp));
- submitResponse.addAttribute("operations-applied", String.valueOf(operations));
-
- Element hashedVersion = submitResponse.addElement("hashed-version");
- hashedVersion.addAttribute("history-hash", Base64Util.encode(version.getHistoryHash()));
- hashedVersion.addAttribute("version", String.valueOf(version.getVersion()));
-
- responseCallback.run(response);
- }
- };
-
- // Hand off the submit request to the wavelet provider.
- waveletProvider.submitRequest(waveletName, delta, listener);
- }
-
- /**
- * Reads a get signer request off the wire, sends it to the WS with a new
- * callback for returning the response.
- * @param request the get signer request
- * @param responseCallback the callback to send the response back
- */
- void processGetSignerRequest(final IQ request, final PacketCallback responseCallback) {
- Element items = request.getChildElement().element("items");
- Element signerRequest = items != null ? items.element("signer-request") : null;
-
- if (items == null || signerRequest == null
- || signerRequest.attributeValue("wavelet-name") == null
- || signerRequest.attributeValue("signer-id") == null
- || signerRequest.attributeValue("version") == null
- || signerRequest.attributeValue("history-hash") == null) {
- manager.sendErrorResponse(request, FederationErrors.badRequest("Malformed signer request"));
- return;
- }
-
- final ByteString signerId;
- try {
- signerId = Base64Util.decode(signerRequest.attributeValue("signer-id"));
- } catch (IllegalArgumentException e) {
- responseCallback.error(FederationErrors.badRequest("Malformed signer ID"));
- return;
- }
-
- final ProtocolHashedVersion deltaEndVersion;
- try {
- deltaEndVersion = parseFromUnsafe(signerRequest.attributeValue("version"),
- signerRequest.attributeValue("history-hash"));
- } catch (IllegalArgumentException e) {
- responseCallback.error(FederationErrors.badRequest("Invalid hashed version"));
- return;
- }
-
- final WaveletName waveletName;
- try {
- waveletName =
- XmppUtil.waveletNameCodec.uriToWaveletName(signerRequest.attributeValue("wavelet-name"));
- } catch (EncodingException e) {
- responseCallback.error(FederationErrors.badRequest("Malformed wavelet name"));
- return;
- }
-
- WaveletFederationProvider.DeltaSignerInfoResponseListener listener =
- new WaveletFederationProvider.DeltaSignerInfoResponseListener() {
- @Override
- public void onFailure(FederationError error) {
- responseCallback.error(error);
- }
-
- @Override
- public void onSuccess(ProtocolSignerInfo signerInfo) {
- IQ response = IQ.createResultIQ(request);
-
- Element pubsub = response.setChildElement("pubsub", XmppNamespace.NAMESPACE_PUBSUB);
- Element items = pubsub.addElement("items");
- XmppUtil.protocolSignerInfoToXml(signerInfo, items);
-
- responseCallback.run(response);
- }
-
- };
-
- waveletProvider.getDeltaSignerInfo(signerId, waveletName, deltaEndVersion, listener);
- }
-
- /**
- * Reads a post signer request off the wire, sends it to the WS with a new
- * callback for returning the response.
- * @param request the post signer request
- * @param responseCallback the callback to send the response back
- */
- void processPostSignerRequest(final IQ request, final PacketCallback responseCallback) {
- Element item = null, signatureElement = null;
- Element pubsubRequest = request.getElement().element("pubsub");
- Element publish = pubsubRequest.element("publish");
- if (publish != null) {
- item = publish.element("item");
- if (item != null) {
- signatureElement = item.element("signature");
- }
- }
-
- if (publish == null || item == null || signatureElement == null
- || signatureElement.attribute("domain") == null
- || signatureElement.attribute("algorithm") == null
- || signatureElement.element("certificate") == null) {
- responseCallback.error(FederationErrors.badRequest("Malformed post signer request"));
- return;
- }
-
- ProtocolSignerInfo signer;
- try {
- signer = XmppUtil.xmlToProtocolSignerInfo(signatureElement);
- } catch (UnknownSignerType e) {
- responseCallback.error(FederationErrors.badRequest(
- "Could not understand signer algorithm: " + e));
- return;
- }
-
- WaveletFederationProvider.PostSignerInfoResponseListener listener =
- new WaveletFederationProvider.PostSignerInfoResponseListener() {
- @Override
- public void onFailure(FederationError error) {
- responseCallback.error(error);
- }
-
- @Override
- public void onSuccess() {
- IQ response = IQ.createResultIQ(request);
-
- Element pubsub = response.setChildElement("pubsub", XmppNamespace.NAMESPACE_PUBSUB);
- Element item = pubsub.addElement("publish").addElement("item");
-
- item.addAttribute("node", "signer");
- item.addElement("signature-response", XmppNamespace.NAMESPACE_WAVE_SERVER);
-
- responseCallback.run(response);
- }
- };
-
- // TODO(thorogood,arb): This field is a Bad Idea; it could be faked and not
- // be a provider we host on this instance. Instead, we should infer from the
- // "To:" JID.
- String targetDomain = signatureElement.attributeValue("domain");
-
- // The first argument is the domain we intend to send this information to.
- waveletProvider.postSignerInfo(targetDomain, signer, listener);
- }
-
- @Override
- public WaveletFederationListener listenerForDomain(String domain) {
- try {
- // TODO(thorogood): Kick off disco here instead of inside
- // XmppFederationHostForDomain.
- return listeners.get(domain);
- } catch (ExecutionException ex) {
- throw new RuntimeException(ex);
- }
- }
-}
diff --git a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppFederationHostForDomain.java b/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppFederationHostForDomain.java
deleted file mode 100644
index d809b33..0000000
--- a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppFederationHostForDomain.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.waveprotocol.wave.federation.xmpp;
-
-import com.google.inject.Inject;
-import com.google.protobuf.ByteString;
-import com.typesafe.config.Config;
-import org.dom4j.Element;
-import org.waveprotocol.wave.federation.FederationErrorProto.FederationError;
-import org.waveprotocol.wave.federation.FederationErrors;
-import org.waveprotocol.wave.federation.Proto.ProtocolHashedVersion;
-import org.waveprotocol.wave.federation.WaveletFederationListener;
-import org.waveprotocol.wave.model.id.URIEncoderDecoder.EncodingException;
-import org.waveprotocol.wave.model.id.WaveletName;
-import org.xmpp.packet.Message;
-import org.xmpp.packet.Packet;
-
-import java.util.List;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * An instance of this class is created on demand for outgoing
- * messages to another wave Federation Remote. The wave server asks
- * the XmppFederationHost to create these.
- */
-class XmppFederationHostForDomain implements WaveletFederationListener {
-
- private static final Logger LOG =
- Logger.getLogger(XmppFederationHostForDomain.class.getCanonicalName());
-
- // Timeout for outstanding listener updates sent over XMPP.
- private static final int XMPP_LISTENER_TIMEOUT = 30;
-
- private final String remoteDomain;
- private final XmppManager manager;
- private final String jid;
- private final XmppDisco disco;
-
- @Inject
- public XmppFederationHostForDomain(final String domain, XmppManager manager,
- XmppDisco disco, Config config) {
- this.remoteDomain = domain;
- this.manager = manager;
- this.jid = config.getString("federation.xmpp_jid");
- this.disco = disco;
- }
-
- @Override
- public void waveletCommitUpdate(WaveletName waveletName, ProtocolHashedVersion committedVersion,
- WaveletUpdateCallback callback) {
- waveletUpdate(waveletName, null, committedVersion, callback);
- }
-
- @Override
- public void waveletDeltaUpdate(WaveletName waveletName, List<ByteString> appliedDeltas,
- WaveletUpdateCallback callback) {
- waveletUpdate(waveletName, appliedDeltas, null, callback);
- }
-
- /**
- * Sends a wavelet update message on behalf of the wave server. This
- * method just triggers a disco lookup (which may be cached) and
- * sets up a callback to call the real method that does the work.
- * This method may contain applied deltas, a commit notice, or both.
- *
- * @param waveletName the wavelet name
- * @param deltaList the deltas to include in the message, or null
- * @param committedVersion last committed version to include, or null
- * @param callback callback to invoke on delivery success/failure
- */
- public void waveletUpdate(final WaveletName waveletName, final List<ByteString> deltaList,
- final ProtocolHashedVersion committedVersion, final WaveletUpdateCallback callback) {
- if ((deltaList == null || deltaList.isEmpty()) && committedVersion == null) {
- throw new IllegalArgumentException("Must send at least one delta, or a last committed " +
- "version notice, for the target wavelet: " + waveletName);
- }
-
- disco.discoverRemoteJid(remoteDomain, new SuccessFailCallback<String, String>() {
- @Override
- public void onSuccess(String remoteJid) {
- internalWaveletUpdate(waveletName, deltaList, committedVersion, callback, remoteJid);
- }
-
- @Override
- public void onFailure(String errorMessage) {
- if (LOG.isLoggable(Level.FINE)) {
- LOG.fine("Disco failed for remote domain " + remoteDomain + ", update not sent");
- }
- callback.onFailure(FederationErrors.newFederationError(
- FederationError.Code.RESOURCE_CONSTRAINT, errorMessage));
- }
- });
- }
-
- /**
- * Sends a wavelet update message on behalf of the wave server once disco is
- * complete. This method may contain applied deltas, a commit notice, or both.
- *
- * @param waveletName the wavelet name
- * @param deltaList the deltas to include in the message, or null
- * @param committedVersion last committed version to include, or null
- * @param callback callback to invoke on delivery success/failure
- * @param remoteJid the remote JID to send the update to
- */
- private void internalWaveletUpdate(final WaveletName waveletName,
- final List<ByteString> deltaList, final ProtocolHashedVersion committedVersion,
- final WaveletUpdateCallback callback, String remoteJid) {
- Message message = new Message();
- message.setType(Message.Type.normal);
- message.setFrom(jid);
- message.setTo(remoteJid);
- message.setID(XmppUtil.generateUniqueId());
- message.addChildElement("request", XmppNamespace.NAMESPACE_XMPP_RECEIPTS);
-
- final String encodedWaveletName;
- try {
- encodedWaveletName = XmppUtil.waveletNameCodec.waveletNameToURI(waveletName);
- } catch (EncodingException e) {
- callback.onFailure(FederationErrors.badRequest("Bad wavelet name " + waveletName));
- return;
- }
-
- Element itemElement = message.addChildElement("event", XmppNamespace.NAMESPACE_PUBSUB_EVENT)
- .addElement("items").addElement("item");
- if (deltaList != null) {
- for (ByteString delta : deltaList) {
- Element waveletUpdate =
- itemElement.addElement("wavelet-update", XmppNamespace.NAMESPACE_WAVE_SERVER)
- .addAttribute("wavelet-name", encodedWaveletName);
- waveletUpdate.addElement("applied-delta").addCDATA(Base64Util.encode(delta.toByteArray()));
- }
- }
- if (committedVersion != null) {
- Element waveletUpdate =
- itemElement.addElement("wavelet-update", XmppNamespace.NAMESPACE_WAVE_SERVER)
- .addAttribute("wavelet-name", encodedWaveletName);
- waveletUpdate.addElement("commit-notice").addAttribute("version",
- Long.toString(committedVersion.getVersion())).addAttribute("history-hash",
- Base64Util.encode(committedVersion.getHistoryHash()));
- }
-
- // Send the generated message through to the foreign XMPP server.
- manager.send(message, new PacketCallback() {
- @Override
- public void error(FederationError error) {
- callback.onFailure(error);
- }
-
- @Override
- public void run(Packet packet) {
- callback.onSuccess();
- }
- }, XMPP_LISTENER_TIMEOUT);
- }
-}
diff --git a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppFederationModule.java b/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppFederationModule.java
deleted file mode 100644
index 0be6e94..0000000
--- a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppFederationModule.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.waveprotocol.wave.federation.xmpp;
-
-import com.google.inject.AbstractModule;
-import com.google.inject.Singleton;
-
-import org.waveprotocol.wave.federation.FederationHostBridge;
-import org.waveprotocol.wave.federation.FederationRemoteBridge;
-import org.waveprotocol.wave.federation.FederationTransport;
-import org.waveprotocol.wave.federation.WaveletFederationListener;
-import org.waveprotocol.wave.federation.WaveletFederationProvider;
-
-/**
- * Module for setting up an XMPP federation subsystem
- *
- * @author tad.glines@gmail.com (Tad Glines)
- */
-public class XmppFederationModule extends AbstractModule {
-
- @Override
- protected void configure() {
- // Request history and submit deltas to the outside world *from* our local
- // Wave Server.
- bind(WaveletFederationProvider.class).annotatedWith(FederationRemoteBridge.class).to(
- XmppFederationRemote.class).in(Singleton.class);
-
- // Serve updates to the outside world about local waves.
- bind(WaveletFederationListener.Factory.class).annotatedWith(FederationHostBridge.class).to(
- XmppFederationHost.class).in(Singleton.class);
-
- bind(XmppDisco.class).in(Singleton.class);
- bind(XmppFederationRemote.class).in(Singleton.class);
- bind(XmppFederationHost.class).in(Singleton.class);
-
- bind(XmppManager.class).in(Singleton.class);
- bind(IncomingPacketHandler.class).to(XmppManager.class);
- bind(ComponentPacketTransport.class).in(Singleton.class);
- bind(OutgoingPacketTransport.class).to(ComponentPacketTransport.class);
-
- bind(FederationTransport.class).to(XmppFederationTransport.class).in(Singleton.class);
- }
-
-}
diff --git a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppFederationRemote.java b/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppFederationRemote.java
deleted file mode 100644
index a28bd72..0000000
--- a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppFederationRemote.java
+++ /dev/null
@@ -1,633 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.waveprotocol.wave.federation.xmpp;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.inject.Inject;
-import com.google.protobuf.ByteString;
-import com.typesafe.config.Config;
-import org.apache.commons.codec.binary.Base64;
-import org.dom4j.Attribute;
-import org.dom4j.Element;
-import org.waveprotocol.wave.federation.FederationErrorProto.FederationError;
-import org.waveprotocol.wave.federation.FederationErrors;
-import org.waveprotocol.wave.federation.FederationRemoteBridge;
-import org.waveprotocol.wave.federation.Proto.ProtocolHashedVersion;
-import org.waveprotocol.wave.federation.Proto.ProtocolSignedDelta;
-import org.waveprotocol.wave.federation.Proto.ProtocolSignerInfo;
-import org.waveprotocol.wave.federation.WaveletFederationListener;
-import org.waveprotocol.wave.federation.WaveletFederationProvider;
-import org.waveprotocol.wave.federation.xmpp.XmppUtil.UnknownSignerType;
-import org.waveprotocol.wave.model.id.URIEncoderDecoder.EncodingException;
-import org.waveprotocol.wave.model.id.WaveletName;
-import org.xmpp.packet.IQ;
-import org.xmpp.packet.Message;
-import org.xmpp.packet.Packet;
-
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.logging.Logger;
-
-/**
- * Remote implementation. Receives submit and history requests from the local
- * wave server and sends them to a remote wave server Host, and also receives
- * update messages from a remote wave server Host and sends them to the local
- * wave server.
- */
-public class XmppFederationRemote implements WaveletFederationProvider {
- private static final Logger LOG = Logger.getLogger(XmppFederationRemote.class.getCanonicalName());
-
- // Timeout for outstanding provider calls sent over XMPP.
- private static final int XMPP_PROVIDER_TIMEOUT = 30;
-
- private final WaveletFederationListener.Factory updatesListenerFactory;
- private final XmppDisco disco;
- private final String jid;
-
- private XmppManager manager = null;
-
- /**
- * Constructor. Note that {@link #setManager} must be called before this class
- * is ready to use.
- *
- * @param updatesListenerFactory used to communicate back to the local wave
- * server when an update arrives.
- */
- @Inject
- public XmppFederationRemote(
- @FederationRemoteBridge WaveletFederationListener.Factory updatesListenerFactory,
- XmppDisco disco, Config config) {
- this.updatesListenerFactory = updatesListenerFactory;
- this.disco = disco;
- this.jid = config.getString("federation.xmpp_jid");
- }
-
- /**
- * Set the manager instance for this class. Must be invoked before any other
- * methods are used.
- */
- public void setManager(XmppManager manager) {
- this.manager = manager;
- }
-
- /**
- * Request submission of signed delta. This is part of the Federation Remote
- * interface - sends a submit request on behalf of the wave server. Part of
- * the WaveletFederationProvider interface.
- *
- * @param waveletName name of wavelet.
- * @param signedDelta delta signed by the submitting wave server.
- * @param listener callback for the result of the submit.
- */
- @Override
- public void submitRequest(final WaveletName waveletName,
- final ProtocolSignedDelta signedDelta,
- final SubmitResultListener listener) {
-
- final IQ submitIq = new IQ(IQ.Type.set);
- submitIq.setID(XmppUtil.generateUniqueId());
-
- LOG.info("Submitting delta to remote server, wavelet " + waveletName);
- submitIq.setFrom(jid);
-
- Element pubsub = submitIq.setChildElement("pubsub", XmppNamespace.NAMESPACE_PUBSUB);
- Element publish = pubsub.addElement("publish");
- publish.addAttribute("node", "wavelet");
- Element submitRequest = publish.addElement("item").addElement("submit-request",
- XmppNamespace.NAMESPACE_WAVE_SERVER);
- Element deltaElement = submitRequest.addElement("delta");
-
- deltaElement.addCDATA(Base64Util.encode(signedDelta.toByteArray()));
- try {
- deltaElement.addAttribute("wavelet-name",
- XmppUtil.waveletNameCodec.waveletNameToURI(waveletName));
- } catch (EncodingException e) {
- listener.onFailure(FederationErrors.badRequest(
- "Couldn't encode wavelet name " + waveletName));
- return;
- }
-
- final PacketCallback callback = new PacketCallback() {
- @Override
- public void error(FederationError error) {
- listener.onFailure(error);
- }
-
- @Override
- public void run(Packet packet) {
- processSubmitResponse(packet, listener);
- }
- };
-
- disco.discoverRemoteJid(waveletName.waveletId.getDomain(),
- new SuccessFailCallback<String, String>() {
- @Override
- public void onSuccess(String remoteJid) {
- Preconditions.checkNotNull(remoteJid);
- submitIq.setTo(remoteJid);
- manager.send(submitIq, callback, XMPP_PROVIDER_TIMEOUT);
- }
-
- @Override
- public void onFailure(String errorMessage) {
- // TODO(thorogood): Broken, Disco should return the error (and it
- // should be timeout/etc)
- listener.onFailure(FederationErrors.badRequest(
- "No such wave server " + waveletName.waveletId.getDomain() + ": " + errorMessage));
- }
- });
- }
-
- /**
- * Retrieve delta history for the given wavelet. <p/> Part of the
- * WaveletFederationProvider interface.
- *
- * @param waveletName name of wavelet.
- * @param domain the remote Federation Host
- * @param startVersion beginning of range (inclusive), minimum 0.
- * @param endVersion end of range (exclusive).
- * @param lengthLimit estimated size, in bytes, as an upper limit on the
- * amount of data returned.
- * @param listener callback for the result.
- */
- public void requestHistory(final WaveletName waveletName,
- final String domain,
- ProtocolHashedVersion startVersion,
- ProtocolHashedVersion endVersion,
- long lengthLimit,
- final WaveletFederationProvider.HistoryResponseListener listener) {
- final IQ submitIq = new IQ(IQ.Type.get);
- submitIq.setID(XmppUtil.generateUniqueId());
-
- LOG.info("Getting history from remote server, wavelet " + waveletName
- + " version " + startVersion + " (inc) through " + endVersion
- + " (ex)");
- submitIq.setFrom(jid);
-
- Element pubsub =
- submitIq.setChildElement("pubsub",
- XmppNamespace.NAMESPACE_PUBSUB);
- Element items = pubsub.addElement("items");
- items.addAttribute("node", "wavelet");
- Element historyDelta =
- items.addElement("delta-history",
- XmppNamespace.NAMESPACE_WAVE_SERVER);
-
- historyDelta.addAttribute("start-version", Long.toString(startVersion
- .getVersion()));
- historyDelta.addAttribute("start-version-hash", Base64Util
- .encode(startVersion.getHistoryHash()));
- historyDelta.addAttribute("end-version", Long.toString(endVersion
- .getVersion()));
- historyDelta.addAttribute("end-version-hash", Base64Util.encode(endVersion
- .getHistoryHash()));
- if (lengthLimit > 0) {
- historyDelta.addAttribute("response-length-limit", Long
- .toString(lengthLimit));
- }
- try {
- historyDelta.addAttribute("wavelet-name",
- XmppUtil.waveletNameCodec.waveletNameToURI(waveletName));
- } catch (EncodingException e) {
- listener.onFailure(
- FederationErrors.badRequest("Couldn't encode wavelet name " + waveletName));
- return;
- }
-
- final PacketCallback callback = new PacketCallback() {
- public void error(FederationError error) {
- listener.onFailure(error);
- }
-
- @Override
- public void run(Packet packet) {
- processHistoryResponse(packet, listener);
- }
- };
-
- disco.discoverRemoteJid(domain, new SuccessFailCallback<String, String>() {
- @Override
- public void onSuccess(String remoteJid) {
- Preconditions.checkNotNull(remoteJid);
- submitIq.setTo(remoteJid);
- manager.send(submitIq, callback, XMPP_PROVIDER_TIMEOUT);
- }
-
- @Override
- public void onFailure(String errorMessage) {
- listener.onFailure(FederationErrors.badRequest(
- "No such wave server " + domain + ": " + errorMessage));
- }
- });
- }
-
- @Override
- public void getDeltaSignerInfo(ByteString signerId, WaveletName waveletName,
- ProtocolHashedVersion deltaEndVersion,
- final DeltaSignerInfoResponseListener listener) {
- final IQ getSignerIq = new IQ(IQ.Type.get);
- getSignerIq.setID(XmppUtil.generateUniqueId());
-
- getSignerIq.setFrom(jid);
- // Extract domain from waveletId
- final String remoteDomain = waveletName.waveletId.getDomain();
- Element pubsub =
- getSignerIq.setChildElement("pubsub",
- XmppNamespace.NAMESPACE_PUBSUB);
- Element items = pubsub.addElement("items");
- items.addAttribute("node", "signer");
- // TODO: should allow multiple requests in the same packet
- Element signerRequest =
- items.addElement("signer-request",
- XmppNamespace.NAMESPACE_WAVE_SERVER);
- signerRequest.addAttribute("signer-id", Base64Util.encode(signerId));
- signerRequest.addAttribute("history-hash", Base64Util
- .encode(deltaEndVersion.getHistoryHash()));
- signerRequest.addAttribute("version", String.valueOf(deltaEndVersion
- .getVersion()));
- try {
- signerRequest.addAttribute("wavelet-name",
- XmppUtil.waveletNameCodec.waveletNameToURI(waveletName));
- } catch (EncodingException e) {
- listener.onFailure(FederationErrors.badRequest(
- "Couldn't encode wavelet name " + waveletName));
- return;
- }
-
- final PacketCallback callback = new PacketCallback() {
- @Override
- public void error(FederationError error) {
- listener.onFailure(error);
- }
-
- @Override
- public void run(Packet packet) {
- processGetSignerResponse(packet, listener);
- }
- };
-
- disco.discoverRemoteJid(
- remoteDomain, new SuccessFailCallback<String, String>() {
- @Override
- public void onSuccess(String remoteJid) {
- Preconditions.checkNotNull(remoteJid);
- getSignerIq.setTo(remoteJid);
- manager.send(getSignerIq, callback, XMPP_PROVIDER_TIMEOUT);
- }
-
- @Override
- public void onFailure(String errorMessage) {
- listener.onFailure(FederationErrors.badRequest(
- "No such wave server " + remoteDomain + ": " + errorMessage));
- }
- });
- }
-
- @Override
- public void postSignerInfo(
- final String remoteDomain,
- ProtocolSignerInfo signerInfo,
- final WaveletFederationProvider.PostSignerInfoResponseListener listener) {
- final IQ request = new IQ(IQ.Type.set);
- request.setID(XmppUtil.generateUniqueId());
-
- request.setFrom(jid);
- Element pubsub = request.setChildElement("pubsub", XmppNamespace.NAMESPACE_PUBSUB);
- Element publish = pubsub.addElement("publish");
- publish.addAttribute("node", "signer");
- XmppUtil.protocolSignerInfoToXml(signerInfo, publish.addElement("item"));
-
- final PacketCallback callback = new PacketCallback() {
- @Override
- public void error(FederationError error) {
- listener.onFailure(error);
- }
-
- @Override
- public void run(Packet packet) {
- processPostSignerResponse(packet, listener);
- }
- };
-
- disco.discoverRemoteJid(
- remoteDomain, new SuccessFailCallback<String, String>() {
- @Override
- public void onSuccess(String remoteJid) {
- Preconditions.checkNotNull(remoteJid);
- request.setTo(remoteJid);
- manager.send(request, callback, XMPP_PROVIDER_TIMEOUT);
- }
-
- @Override
- public void onFailure(String errorMessage) {
- listener.onFailure(FederationErrors.badRequest(
- "No such wave server " + remoteDomain + ": " + errorMessage));
- }
- });
- }
-
- /**
- * Handles a wavelet update message from a foreign Federation Host. Passes the
- * message to the local waveserver (synchronously) and replies.
- *
- * @param updateMessage the incoming XMPP message.
- * @param responseCallback response callback for acks and errors
- */
- public void update(final Message updateMessage, final PacketCallback responseCallback) {
- final Element receiptRequested =
- updateMessage.getChildElement("request", XmppNamespace.NAMESPACE_XMPP_RECEIPTS);
-
- // Check existence of <event>
- Element event = updateMessage.getChildElement("event", XmppNamespace.NAMESPACE_PUBSUB_EVENT);
- if (event == null) {
- responseCallback.error(FederationErrors.badRequest("Event element missing from message"));
- return;
- }
-
- // Check existence of <items> within <event>
- Element items = event.element("items");
- if (items == null) {
- responseCallback.error(FederationErrors.badRequest(
- "Items element missing from update message"));
- return;
- }
-
- // Complain if no items have been included.
- List<Element> elements = XmppUtil.toSafeElementList(items.elements("item"));
- if (elements.isEmpty()) {
- responseCallback.error(FederationErrors.badRequest("No items included"));
- return;
- }
-
- // Create a callback latch counter and corresponding countDown runnable.
- // When the latch reaches zero, send receipt (if it was requested).
- final AtomicInteger callbackCount = new AtomicInteger(1);
- final Runnable countDown = new Runnable() {
- @Override
- public void run() {
- if (callbackCount.decrementAndGet() == 0 && receiptRequested != null) {
- Message response = XmppUtil.createResponseMessage(updateMessage);
- response.addChildElement("received", XmppNamespace.NAMESPACE_XMPP_RECEIPTS);
- responseCallback.run(response);
- }
- }
- };
-
- WaveletFederationListener.WaveletUpdateCallback callback =
- new WaveletFederationListener.WaveletUpdateCallback() {
- @Override
- public void onSuccess() {
- countDown.run();
- }
-
- @Override
- public void onFailure(FederationError error) {
- // Note that we don't propogate the error, we just ack the stanza
- // and continue.
- // TODO(thorogood): We may want to rate-limit misbehaving servers
- // that are sending us invalid/malicious data.
- LOG.warning("Incoming XMPP waveletUpdate failure: " + error);
- countDown.run();
- }
- };
-
- // We must call callback once on every iteration to ensure that we send
- // response if receiptRequested != null.
- for (Element item : elements) {
- Element waveletUpdate = item.element("wavelet-update");
-
- if (waveletUpdate == null) {
- callback.onFailure(FederationErrors.badRequest(
- "wavelet-update element missing from message: " + updateMessage));
- continue;
- }
-
- final WaveletName waveletName;
- try {
- waveletName = XmppUtil.waveletNameCodec.uriToWaveletName(
- waveletUpdate.attributeValue("wavelet-name"));
- } catch (EncodingException e) {
- callback.onFailure(FederationErrors.badRequest(
- "Couldn't decode wavelet name: " + waveletUpdate.attributeValue("wavelet-name")));
- continue;
- }
-
- WaveletFederationListener listener =
- updatesListenerFactory.listenerForDomain(waveletName.waveletId.getDomain());
-
- // Submit all applied deltas to the domain-focused listener.
- ImmutableList.Builder<ByteString> builder = ImmutableList.builder();
- for (Element appliedDeltaElement :
- XmppUtil.toSafeElementList(waveletUpdate.elements("applied-delta"))) {
- builder.add(Base64Util.decode(appliedDeltaElement.getText()));
- }
- ImmutableList<ByteString> deltas = builder.build();
- if (!deltas.isEmpty()) {
- callbackCount.incrementAndGet(); // Increment required callbacks.
- listener.waveletDeltaUpdate(waveletName, deltas, callback);
- }
-
- // Optionally submit any received last committed notice.
- Element commitNoticeElement = waveletUpdate.element("commit-notice");
- if (commitNoticeElement != null) {
- ProtocolHashedVersion version = ProtocolHashedVersion.newBuilder()
- .setHistoryHash(Base64Util.decode(commitNoticeElement.attributeValue("history-hash")))
- .setVersion(Long.parseLong(commitNoticeElement.attributeValue("version"))).build();
- callbackCount.incrementAndGet(); // Increment required callbacks.
- listener.waveletCommitUpdate(waveletName, version, callback);
- }
- }
-
- // Release sentinel so that 'expected' callbacks from the WS don't invoke
- // sending a receipt.
- countDown.run();
- }
-
- /**
- * Parses the response to a submitRequest and passes the result to the correct
- * wave server.
- *
- * @param result the XMPP Packet
- * @param listener the listener to invoke with the response.
- */
- private void processSubmitResponse(Packet result, SubmitResultListener listener) {
- Element publish = null;
- Element item = null;
- Element submitResponse = null;
- Element hashedVersionElement = null;
- Element pubsub = ((IQ) result).getChildElement();
- if (pubsub != null) {
- publish = pubsub.element("publish");
- if (publish != null) {
- item = publish.element("item");
- if (item != null) {
- submitResponse = item.element("submit-response");
- if (submitResponse != null) {
- hashedVersionElement = submitResponse.element("hashed-version");
- }
- }
- }
- }
-
- if (pubsub == null || publish == null || item == null
- || submitResponse == null || hashedVersionElement == null
- || hashedVersionElement.attribute("history-hash") == null
- || hashedVersionElement.attribute("version") == null
- || submitResponse.attribute("application-timestamp") == null
- || submitResponse.attribute("operations-applied") == null) {
- LOG.severe("Unexpected submitResponse to submit request: " + result);
- listener.onFailure(FederationErrors.badRequest("Invalid submitResponse: " + result));
- return;
- }
-
- ProtocolHashedVersion.Builder hashedVersion = ProtocolHashedVersion.newBuilder();
- hashedVersion.setHistoryHash(
- Base64Util.decode(hashedVersionElement.attributeValue("history-hash")));
- hashedVersion.setVersion(Long.parseLong(hashedVersionElement.attributeValue("version")));
- long applicationTimestamp =
- Long.parseLong(submitResponse.attributeValue("application-timestamp"));
- int operationsApplied = Integer.parseInt(submitResponse.attributeValue("operations-applied"));
- listener.onSuccess(operationsApplied, hashedVersion.build(), applicationTimestamp);
- }
-
- /**
- * Parses a response to a history request and passes the result to the wave
- * server.
- *
- * @param historyResponse the XMPP packet
- * @param listener interface to the wave server
- */
- @SuppressWarnings("unchecked")
- private void processHistoryResponse(Packet historyResponse,
- WaveletFederationProvider.HistoryResponseListener listener) {
- Element pubsubResponse = historyResponse.getElement().element("pubsub");
- Element items = pubsubResponse.element("items");
- long versionTruncatedAt = -1;
- long lastCommittedVersion = -1;
- List<ByteString> deltaList = Lists.newArrayList();
-
- if (items != null) {
- for (Element itemElement : (List<Element>) items.elements()) {
- for (Element element : (List<Element>) itemElement.elements()) {
- String elementName = element.getQName().getName();
- switch (elementName) {
- case "applied-delta":
- String deltaBody = element.getText();
- deltaList.add(ByteString.copyFrom(Base64.decodeBase64(deltaBody.getBytes())));
- break;
- case "commit-notice":
- Attribute commitVersion = element.attribute("version");
- if (commitVersion != null) {
- try {
- lastCommittedVersion = Long.parseLong(commitVersion.getValue());
- } catch (NumberFormatException e) {
- lastCommittedVersion = -1;
- }
- }
- break;
- case "history-truncated":
- Attribute truncVersion = element.attribute("version");
- if (truncVersion != null) {
- try {
- versionTruncatedAt = Long.parseLong(truncVersion.getValue());
- } catch (NumberFormatException e) {
- versionTruncatedAt = -1;
- }
- }
- break;
- default:
- listener.onFailure(FederationErrors.badRequest(
- "Bad response packet: " + historyResponse));
- break;
- }
- }
- }
- } else {
- listener.onFailure(FederationErrors.badRequest("Bad response packet: " + historyResponse));
- }
-
- final ProtocolHashedVersion lastCommitted;
- if (lastCommittedVersion > -1) {
- // TODO(thorogood): fedone doesn't send a history hash, and it's arguable
- // that it's even sane to include it.
- // Can't set it to null - NPE
- lastCommitted =
- ProtocolHashedVersion.newBuilder()
- .setVersion(lastCommittedVersion).setHistoryHash(ByteString.EMPTY)
- .build();
- } else {
- lastCommitted = null;
- }
- listener.onSuccess(deltaList, lastCommitted, versionTruncatedAt);
- }
-
- /**
- * Parses a GetSigner response, passes result to the waveserver.
- *
- * @param packet the response packet
- * @param listener the interface to the wave server
- */
- private void processGetSignerResponse(Packet packet, DeltaSignerInfoResponseListener listener) {
- IQ response = (IQ) packet;
- Element items = response.getChildElement().element("items");
- Element signature = items.element("signature");
- if (signature == null) {
- LOG.severe("Empty getDeltaSignerRequest response: " + response);
- listener.onFailure(FederationErrors.badRequest("Bad getDeltaSignatureRequest response"));
- return;
- }
- String domain = signature.attributeValue("domain");
- String hashName = signature.attributeValue("algorithm");
- if (domain == null || hashName == null || signature.element("certificate") == null) {
- LOG.severe("Bad getDeltaSignerRequest response: " + response);
- listener.onFailure(FederationErrors.badRequest("Bad getDeltaSignatureRequest response"));
- return;
- }
- ProtocolSignerInfo signer;
- try {
- signer = XmppUtil.xmlToProtocolSignerInfo(signature);
- } catch (UnknownSignerType e) {
- listener.onFailure(FederationErrors.badRequest(e.toString()));
- return;
- }
- listener.onSuccess(signer);
- }
-
- /**
- * Parses a response to a PostSigner request, passes result to wave server.
- *
- * @param packet the response XMPP packet
- * @param listener the listener to invoke
- */
- private void processPostSignerResponse(
- Packet packet,
- WaveletFederationProvider.PostSignerInfoResponseListener listener) {
- IQ response = (IQ) packet;
- Element pubsub = response.getChildElement();
- Element item = pubsub.element("publish").element("item");
- if (item.element("signature-response") != null) {
- listener.onSuccess();
- } else {
- listener.onFailure(FederationErrors.badRequest("No valid response"));
- }
- }
-}
diff --git a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppFederationTransport.java b/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppFederationTransport.java
deleted file mode 100644
index bb4e654..0000000
--- a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppFederationTransport.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.waveprotocol.wave.federation.xmpp;
-
-import com.google.inject.Inject;
-
-import org.waveprotocol.wave.federation.FederationTransport;
-import org.waveprotocol.wave.util.logging.Log;
-import org.xmpp.component.ComponentException;
-
-/**
- * An implementation of {@link FederationManger} for XMPP federation.
- *
- * @author tad.glines@gmail.com (Tad Glines)
- */
-public class XmppFederationTransport implements FederationTransport {
- private static final Log LOG = Log.get(XmppFederationTransport.class);
- private final ComponentPacketTransport transport;
-
- @Inject
- XmppFederationTransport(ComponentPacketTransport transport) {
- this.transport = transport;
- }
-
- @Override
- public void startFederation() {
- try {
- transport.run();
- } catch (ComponentException e) {
- LOG.warning("couldn't connect to XMPP server:", e);
- }
- }
-}
diff --git a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppManager.java b/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppManager.java
deleted file mode 100644
index e0f2fb7..0000000
--- a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppManager.java
+++ /dev/null
@@ -1,474 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.waveprotocol.wave.federation.xmpp;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.MapMaker;
-import com.google.inject.Inject;
-import com.typesafe.config.Config;
-import org.dom4j.Element;
-import org.waveprotocol.wave.federation.FederationErrorProto.FederationError;
-import org.waveprotocol.wave.federation.FederationErrors;
-import org.xmpp.packet.IQ;
-import org.xmpp.packet.Message;
-import org.xmpp.packet.Packet;
-import org.xmpp.packet.PacketError;
-
-import java.util.concurrent.*;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Provides abstraction between Federation-specific code and the backing XMPP
- * transport, including support for reliable outgoing calls (i.e. calls that are
- * guaranteed to time out) and sending error responses.
- *
- * TODO(thorogood): Find a better name for this class. Suggestions include
- * PacketHandler, Switchbox, TransportConnector, ReliableRouter, ...
- *
- * @author thorogood@google.com (Sam Thorogood)
- */
-public class XmppManager implements IncomingPacketHandler {
- private static final Logger LOG = Logger.getLogger(XmppManager.class.getCanonicalName());
-
- /**
- * Inner static class representing a single outgoing call.
- */
- private static class OutgoingCall {
- final Class<? extends Packet> responseType;
- PacketCallback callback;
- ScheduledFuture<?> timeout;
-
- OutgoingCall(Class<? extends Packet> responseType, PacketCallback callback) {
- this.responseType = responseType;
- this.callback = callback;
- }
-
- void start(ScheduledFuture<?> timeout) {
- Preconditions.checkState(this.timeout == null);
- this.timeout = timeout;
- }
- }
-
- /**
- * Inner non-static class representing a single incoming call. These are not
- * cancellable and do not time out; this is just a helper class so success and
- * failure responses may be more cleanly invoked.
- */
- private class IncomingCallback implements PacketCallback {
- private final Packet request;
- private boolean complete = false;
-
- IncomingCallback(Packet request) {
- this.request = request;
- }
-
- @Override
- public void error(FederationError error) {
- Preconditions.checkState(!complete,
- "Must not callback multiple times for incoming packet: %s", request);
- complete = true;
- sendErrorResponse(request, error);
- }
-
- @Override
- public void run(Packet response) {
- Preconditions.checkState(!complete,
- "Must not callback multiple times for incoming packet: %s", request);
- // TODO(thorogood): Check outgoing response versus stored incoming request
- // to ensure that to/from are paired correctly?
- complete = true;
- transport.sendPacket(response);
- }
- }
-
- // Injected types that handle incoming XMPP packet types.
- private final XmppFederationHost host;
- private final XmppFederationRemote remote;
- private final XmppDisco disco;
- private final OutgoingPacketTransport transport;
- private final String jid;
-
- // Pending callbacks to outgoing requests.
- private final ConcurrentMap<String, OutgoingCall> callbacks = new MapMaker().makeMap();
- private final ScheduledExecutorService timeoutExecutor =
- Executors.newSingleThreadScheduledExecutor();
-
- @Inject
- public XmppManager(XmppFederationHost host, XmppFederationRemote remote, XmppDisco disco,
- OutgoingPacketTransport transport, Config config) {
- this.host = host;
- this.remote = remote;
- this.disco = disco;
- this.transport = transport;
- this.jid = config.getString("federation.xmpp_jid");
-
- // Configure all related objects with this manager. Eventually, this should
- // be replaced by better Guice interface bindings.
- host.setManager(this);
- remote.setManager(this);
- disco.setManager(this);
- }
-
- @Override
- public void receivePacket(final Packet packet) {
- if (LOG.isLoggable(Level.FINE)) {
- LOG.fine("Received incoming XMPP packet:\n" + packet);
- }
-
- if (packet instanceof IQ) {
- IQ iq = (IQ) packet;
- if (iq.getType().equals(IQ.Type.result) || iq.getType().equals(IQ.Type.error)) {
- // Result type, hand off to callback handler.
- response(packet);
- } else {
- processIqGetSet(iq);
- }
- } else if (packet instanceof Message) {
- Message message = (Message) packet;
- if (message.getType().equals(Message.Type.error)
- || message.getChildElement("received", XmppNamespace.NAMESPACE_XMPP_RECEIPTS) != null) {
- // Response type, hand off to callback handler.
- response(packet);
- } else {
- processMessage(message);
- }
- } else {
- sendErrorResponse(packet, FederationError.Code.BAD_REQUEST, "Unhandled packet type: "
- + packet.getElement().getQName().getName());
- }
- }
-
- /**
- * Populate the given request subclass of Packet and return it.
- */
- private <V extends Packet> V createRequest(V packet, String toJid) {
- packet.setTo(toJid);
- packet.setID(XmppUtil.generateUniqueId());
- packet.setFrom(jid);
- return packet;
- }
-
- /**
- * Create a request IQ stanza with the given toJid.
- *
- * @param toJid target JID
- * @return new IQ stanza
- */
- public IQ createRequestIQ(String toJid) {
- return createRequest(new IQ(), toJid);
- }
-
- /**
- * Create a request Message stanza with the given toJid.
- *
- * @param toJid target JID
- * @return new Message stanza
- */
- public Message createRequestMessage(String toJid) {
- return createRequest(new Message(), toJid);
- }
-
- /**
- * Sends the given XMPP packet over the backing transport. This accepts a
- * callback which is guaranteed to be invoked at a later point, either through
- * a normal response, error response, or timeout.
- *
- * @param packet packet to be sent
- * @param callback callback to be invoked on response or timeout
- * @param timeout timeout, in seconds, for this callback
- */
- public void send(Packet packet, final PacketCallback callback, int timeout) {
- final String key = packet.getID() + "#" + packet.getTo() + "#" + packet.getFrom();
-
- final OutgoingCall call = new OutgoingCall(packet.getClass(), callback);
- if (callbacks.putIfAbsent(key, call) == null) {
- // Timeout runnable to be invoked on packet expiry.
- Runnable timeoutTask = new Runnable() {
- @Override
- public void run() {
- if (callbacks.remove(key, call)) {
- callback.error(
- FederationErrors.newFederationError(FederationError.Code.REMOTE_SERVER_TIMEOUT));
- } else {
- // Likely race condition where success has actually occurred. Ignore.
- }
- }
- };
- call.start(timeoutExecutor.schedule(timeoutTask, timeout, TimeUnit.SECONDS));
- transport.sendPacket(packet);
- } else {
- String msg = "Could not send packet, ID already in-flight: " + key;
- LOG.warning(msg);
-
- // Invoke the callback with an internal error.
- callback.error(
- FederationErrors.newFederationError(FederationError.Code.UNDEFINED_CONDITION, msg));
- }
- }
-
- /**
- * Cause an immediate timeout for the given packet, which is presumed to have
- * already been sent via {@link #send}.
- */
- @VisibleForTesting
- void causeImmediateTimeout(Packet packet) {
- String key = packet.getID() + "#" + packet.getTo() + "#" + packet.getFrom();
- OutgoingCall call = callbacks.remove(key);
- if (call != null) {
- call.callback.error(FederationErrors.newFederationError(
- FederationError.Code.REMOTE_SERVER_TIMEOUT, "Forced immediate timeout"));
- }
- }
-
- /**
- * Invoke the callback for a packet already identified as a response. This may
- * either invoke the error or normal callback as necessary.
- */
- private void response(Packet packet) {
- String key = packet.getID() + "#" + packet.getFrom() + "#" + packet.getTo();
- OutgoingCall call = callbacks.remove(key);
-
- if (call == null) {
- LOG.warning("Received response packet without paired request: " + packet.getID());
- } else {
- // Cancel the outstanding timeout.
- call.timeout.cancel(false);
-
- // Look for error condition and invoke the relevant callback.
- Element element = packet.getElement().element("error");
- if (element != null) {
- LOG.fine("Invoking error callback for: " + packet.getID());
- call.callback.error(toFederationError(new PacketError(element)));
- } else {
- if (call.responseType.equals(packet.getClass())) {
- LOG.fine("Invoking normal callback for: " + packet.getID());
- call.callback.run(packet);
- } else {
- String msg =
- "Received mismatched response packet type: expected " + call.responseType
- + ", given " + packet.getClass();
- LOG.warning(msg);
- call.callback.error(FederationErrors.newFederationError(
- FederationError.Code.UNDEFINED_CONDITION, msg));
- }
- }
-
- // Clear call's reference to callback, otherwise callback only
- // becomes eligible for GC once the timeout expires, because
- // timeoutExecutor holds on to the call object till then, even
- // though we cancelled the timeout.
- call.callback = null;
- }
- }
-
- /**
- * Process IQ request stanzas. This encompasses XMPP disco, submit and history
- * requests/responses, and get/post signer info requests/responses.
- */
- private void processIqGetSet(IQ iq) {
- Element body = iq.getChildElement();
- if (body == null) {
- sendErrorResponse(iq, FederationErrors.badRequest("Malformed request, no IQ child"));
- return;
- }
-
- final String namespace = body.getQName().getNamespace().getURI();
- final boolean isIQSet;
- if (iq.getType().equals(IQ.Type.get)) {
- isIQSet = false;
- } else if (iq.getType().equals(IQ.Type.set)) {
- isIQSet = true;
- } else {
- throw new IllegalArgumentException("Can only process an IQ get/set.");
- }
- PacketCallback responseCallback = new IncomingCallback(iq);
-
- if (namespace.equals(XmppNamespace.NAMESPACE_PUBSUB)) {
- final Element pubsub = iq.getChildElement();
- final Element element = pubsub.element(isIQSet ? "publish" : "items");
-
- if (element.attributeValue("node").equals("wavelet")) {
- if (isIQSet) {
- host.processSubmitRequest(iq, responseCallback);
- } else {
- host.processHistoryRequest(iq, responseCallback);
- }
- } else if (element.attributeValue("node").equals("signer")) {
- if (isIQSet) {
- host.processPostSignerRequest(iq, responseCallback);
- } else {
- host.processGetSignerRequest(iq, responseCallback);
- }
- } else {
- sendErrorResponse(iq, FederationError.Code.BAD_REQUEST, "Unhandled pubsub request");
- }
- } else if (!isIQSet) {
- switch (namespace) {
- case XmppNamespace.NAMESPACE_DISCO_INFO:
- disco.processDiscoInfoGet(iq, responseCallback);
- break;
- case XmppNamespace.NAMESPACE_DISCO_ITEMS:
- disco.processDiscoItemsGet(iq, responseCallback);
- break;
- default:
- sendErrorResponse(iq, FederationError.Code.BAD_REQUEST, "Unhandled IQ get");
- break;
- }
- } else {
- sendErrorResponse(iq, FederationError.Code.BAD_REQUEST, "Unhandled IQ set");
- }
- }
-
- /**
- * Processes Message stanzas. This encompasses wavelet updates, update acks,
- * and ping messages.
- */
- private void processMessage(Message message) {
- if (message.getChildElement("event", XmppNamespace.NAMESPACE_PUBSUB_EVENT) != null) {
- remote.update(message, new IncomingCallback(message));
- } else if (message.getChildElement("ping", XmppNamespace.NAMESPACE_WAVE_SERVER) != null) {
- // Respond inline to the ping.
- LOG.info("Responding to ping from: " + message.getFrom());
- Message response = XmppUtil.createResponseMessage(message);
- response.addChildElement("received", XmppNamespace.NAMESPACE_XMPP_RECEIPTS);
- transport.sendPacket(response);
- } else {
- sendErrorResponse(message, FederationError.Code.BAD_REQUEST, "Unhandled message type");
- }
- }
-
- /**
- * Helper method to send generic error responses, backed onto
- * {@link #sendErrorResponse(Packet, FederationError)}.
- */
- void sendErrorResponse(Packet request, FederationError.Code code) {
- sendErrorResponse(request, FederationErrors.newFederationError(code));
- }
-
- /**
- * Helper method to send error responses, backed onto
- * {@link #sendErrorResponse(Packet, FederationError)}.
- */
- void sendErrorResponse(Packet request, FederationError.Code code, String text) {
- sendErrorResponse(request, FederationErrors.newFederationError(code, text));
- }
-
- /**
- * Send an error request to the passed incoming request.
- *
- * @param request packet request, target is derived from its to/from
- * @param error error to be contained in response
- */
- void sendErrorResponse(Packet request, FederationError error) {
- if (error.getErrorCode() == FederationError.Code.OK) {
- throw new IllegalArgumentException("Can't send an error of OK!");
- }
- sendErrorResponse(request, toPacketError(error));
- }
-
- /**
- * Send an error response to the passed incoming request. Throws
- * IllegalArgumentException if the original packet is also an error, or is of
- * the IQ result type.
- *
- * According to RFC 3920 (9.3.1), the error packet may contain the original
- * packet. However, this implementation does not include it.
- *
- * @param request packet request, to/from is inverted for response
- * @param error packet error describing error condition
- */
- void sendErrorResponse(Packet request, PacketError error) {
- if (request instanceof IQ) {
- IQ.Type type = ((IQ) request).getType();
- if (!(type.equals(IQ.Type.get) || type.equals(IQ.Type.set))) {
- throw new IllegalArgumentException("May only return an error to IQ get/set, not: " + type);
- }
- } else if (request instanceof Message) {
- Message message = (Message) request;
- if (message.getType().equals(Message.Type.error)) {
- throw new IllegalArgumentException("Can't return an error to another message error");
- }
- } else {
- throw new IllegalArgumentException("Unexpected Packet subclass, expected Message/IQ: "
- + request.getClass());
- }
-
- LOG.fine("Sending error condition in response to " + request.getID() + ": "
- + error.getCondition().name());
-
- // Note that this does not include the original packet; just the ID.
- final Packet response = XmppUtil.createResponsePacket(request);
- response.setError(error);
-
- transport.sendPacket(response);
- }
-
- /**
- * Convert a FederationError instance to a PacketError. This may return
- * <undefined-condition> if the incoming error can't be understood.
- *
- * @param error the incoming error
- * @return a generated PacketError instance
- * @throws IllegalArgumentException if the OK error code is given
- */
- private static PacketError toPacketError(FederationError error) {
- Preconditions.checkArgument(error.getErrorCode() != FederationError.Code.OK);
-
- String tag = error.getErrorCode().name().toLowerCase().replace('_', '-');
- PacketError.Condition condition;
- try {
- condition = PacketError.Condition.fromXMPP(tag);
- } catch (IllegalArgumentException e) {
- condition = PacketError.Condition.undefined_condition;
- LOG.warning("Did not understand error condition, defaulting to: " + condition.name());
- }
- PacketError result = new PacketError(condition);
- if (error.hasErrorMessage()) {
- // TODO(thorogood): Hide this behind a flag so we don't always broadcast error cases.
- result.setText(error.getErrorMessage(), "en");
- }
- return result;
- }
-
- /**
- * Convert a PacketError instance to an internal FederationError. This may
- * return an error code of UNDEFINED_CONDITION if the incoming error can't be
- * understood.
- *
- * @param error the incoming PacketError
- * @return the generated FederationError instance
- */
- private static FederationError toFederationError(PacketError error) {
- String tag = error.getCondition().name().toUpperCase().replace('-', '_');
- FederationError.Code code;
- try {
- code = FederationError.Code.valueOf(tag);
- } catch (IllegalArgumentException e) {
- code = FederationError.Code.UNDEFINED_CONDITION;
- }
- FederationError.Builder builder = FederationError.newBuilder().setErrorCode(code);
- if (error.getText() != null) {
- builder.setErrorMessage(error.getText());
- }
- return builder.build();
- }
-}
diff --git a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppNamespace.java b/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppNamespace.java
deleted file mode 100644
index 7656a3b..0000000
--- a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppNamespace.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.waveprotocol.wave.federation.xmpp;
-
-/**
- * Namespace definitions for the XMPP package.
- *
- * @author thorogood@google.com (Sam Thorogood)
- */
-final class XmppNamespace {
-
- // Namespace definitions for packet types
- static final String NAMESPACE_XMPP_RECEIPTS = "urn:xmpp:receipts";
- static final String NAMESPACE_DISCO_INFO = "http://jabber.org/protocol/disco#info";
- static final String NAMESPACE_DISCO_ITEMS = "http://jabber.org/protocol/disco#items";
- static final String NAMESPACE_PUBSUB = "http://jabber.org/protocol/pubsub";
- static final String NAMESPACE_PUBSUB_EVENT = "http://jabber.org/protocol/pubsub#event";
- static final String NAMESPACE_WAVE_SERVER = "http://waveprotocol.org/protocol/0.2/waveserver";
-
- /**
- * Uninstantiable class.
- */
- private XmppNamespace() {
- }
-
-}
diff --git a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppUtil.java b/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppUtil.java
deleted file mode 100644
index 7ca2971..0000000
--- a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppUtil.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.waveprotocol.wave.federation.xmpp;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.ByteString;
-
-import org.dom4j.Element;
-import org.waveprotocol.wave.federation.Proto.ProtocolSignerInfo;
-import org.waveprotocol.wave.model.id.IdURIEncoderDecoder;
-import org.waveprotocol.wave.util.escapers.jvm.JavaUrlCodec;
-import org.xmpp.packet.IQ;
-import org.xmpp.packet.Message;
-import org.xmpp.packet.Packet;
-
-import java.nio.ByteBuffer;
-import java.security.SecureRandom;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.Callable;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * Common utility code for XMPP packet generation and parsing.
- */
-public class XmppUtil {
- private static final AtomicLong idSequenceNo = new AtomicLong(0);
- private static final Random random = new SecureRandom();
-
- public static final IdURIEncoderDecoder waveletNameCodec =
- new IdURIEncoderDecoder(new JavaUrlCodec());
-
- // If non-null, this fake unique ID will be returned from generateUniqueId()
- // rather than a random base64 string.
- @VisibleForTesting
- public static String fakeUniqueId = null;
-
- // Alternately, and better, this callable will be called each time an ID is needed, if non-null.
- @VisibleForTesting
- static Callable<String> fakeIdGenerator = null;
-
- private XmppUtil() {
- }
-
- /**
- * Helper method to translate from the XMPP package (1.4 without generics) to
- * type-safe element lists.
- */
- @SuppressWarnings({"cast", "unchecked", "rawtypes"})
- public static List<Element> toSafeElementList(List elements) {
- return (List<Element>) elements;
- }
-
- /**
- * Checked exception thrown by signer conversion code.
- */
- public static class UnknownSignerType extends Exception {
- public UnknownSignerType(String algorithm) {
- super(algorithm);
- }
-
- public UnknownSignerType(String algorithm, Throwable stacked) {
- super(algorithm, stacked);
- }
- }
-
- /**
- * Convert the signer information to XML and place the result within the
- * passed Element. This method should never fail.
- */
- public static void protocolSignerInfoToXml(ProtocolSignerInfo signerInfo, Element parent) {
- Element signature = parent.addElement("signature", XmppNamespace.NAMESPACE_WAVE_SERVER);
- signature.addAttribute("domain", signerInfo.getDomain());
- ProtocolSignerInfo.HashAlgorithm hashValue = signerInfo.getHashAlgorithm();
-
- signature.addAttribute("algorithm", hashValue.name());
- for (ByteString cert : signerInfo.getCertificateList()) {
- signature.addElement("certificate").addCDATA(Base64Util.encode(cert));
- }
- }
-
- /**
- * Convert the given Element to a signer information XML element.
- *
- * @throws UnknownSignerType when the given hash algorithm is not understood
- */
- public static ProtocolSignerInfo xmlToProtocolSignerInfo(Element signature)
- throws UnknownSignerType {
- ProtocolSignerInfo.HashAlgorithm hash;
- String algorithm = signature.attributeValue("algorithm").toUpperCase();
- try {
- hash = ProtocolSignerInfo.HashAlgorithm.valueOf(algorithm);
- } catch (IllegalArgumentException e) {
- throw new UnknownSignerType(algorithm, e);
- }
-
- ProtocolSignerInfo.Builder builder = ProtocolSignerInfo.newBuilder();
- builder.setHashAlgorithm(hash);
- builder.setDomain(signature.attributeValue("domain"));
- for (Element certElement : toSafeElementList(signature.elements("certificate"))) {
- builder.addCertificate(Base64Util.decode(certElement.getText()));
- }
- return builder.build();
- }
-
- /**
- * Convenience method to create a response {@link Message} instance based on
- * the passed request. Simply returns a new message instance with the same ID,
- * but with inverse to/from addresses.
- *
- * @param request the request message
- * @return the new response message
- */
- public static Message createResponseMessage(Message request) {
- Message response = new Message();
- response.setID(request.getID());
- response.setTo(request.getFrom());
- response.setFrom(request.getTo());
- return response;
- }
-
- /**
- * Convenience method to create a response {@link Packet} implementation from
- * the given source packet. This will return either an {@link IQ} or
- * {@link Message} depending on the passed type.
- *
- * @param request the request message
- * @return the new response message
- */
- public static Packet createResponsePacket(Packet request) {
- if (request instanceof Message) {
- return createResponseMessage((Message) request);
- } else if (request instanceof IQ) {
- return IQ.createResultIQ((IQ) request);
- } else {
- throw new IllegalArgumentException("Can't respond to unsupported packet type: "
- + request.getClass());
- }
- }
-
- /**
- * Generate a unique string identifier for use in stanzas.
- *
- * @return unique string identifier
- */
- public static String generateUniqueId() {
- if (fakeIdGenerator != null) {
- try {
- return fakeIdGenerator.call();
- } catch (Exception e) {
- // This is used in tests only.
- throw new RuntimeException(e);
- }
- }
- // TODO(arb): deprecate this.
- if (fakeUniqueId != null) {
- return fakeUniqueId;
- }
-
- // Generate a base64 ID based on raw bytes.
- byte[] bytes = ByteBuffer.allocate(16)
- .putLong(random.nextLong()).putLong(idSequenceNo.incrementAndGet()).array();
- return Base64Util.encode(bytes);
- }
-}
diff --git a/wave/src/test/java/org/waveprotocol/box/server/persistence/file/AccountStoreTest.java b/wave/src/test/java/org/waveprotocol/box/server/persistence/file/AccountStoreTest.java
index c1bd5f0..6868ee3 100644
--- a/wave/src/test/java/org/waveprotocol/box/server/persistence/file/AccountStoreTest.java
+++ b/wave/src/test/java/org/waveprotocol/box/server/persistence/file/AccountStoreTest.java
@@ -19,6 +19,7 @@
package org.waveprotocol.box.server.persistence.file;
+import com.google.common.collect.ImmutableMap;
import com.typesafe.config.ConfigFactory;
import org.waveprotocol.box.server.persistence.AccountStore;
import org.waveprotocol.box.server.persistence.AccountStoreTestBase;
@@ -42,7 +43,7 @@
@Override
protected AccountStore newAccountStore() {
return new FileAccountStore(
- ConfigFactory.parseString("core.account_store_directory : " + path.getAbsolutePath()));
+ ConfigFactory.parseMap (ImmutableMap.of("core.account_store_directory", path.getAbsolutePath())));
}
@Override
diff --git a/wave/src/test/java/org/waveprotocol/box/server/persistence/file/AttachmentStoreTest.java b/wave/src/test/java/org/waveprotocol/box/server/persistence/file/AttachmentStoreTest.java
index 6ace3ee..53a68d3 100644
--- a/wave/src/test/java/org/waveprotocol/box/server/persistence/file/AttachmentStoreTest.java
+++ b/wave/src/test/java/org/waveprotocol/box/server/persistence/file/AttachmentStoreTest.java
@@ -19,6 +19,7 @@
package org.waveprotocol.box.server.persistence.file;
+import com.google.common.collect.ImmutableMap;
import com.typesafe.config.ConfigFactory;
import org.waveprotocol.box.server.persistence.AttachmentStore;
import org.waveprotocol.box.server.persistence.AttachmentStoreTestBase;
@@ -41,7 +42,7 @@
@Override
protected AttachmentStore newAttachmentStore() {
return new FileAttachmentStore(
- ConfigFactory.parseString("core.attachment_store_directory : " + path.getAbsolutePath()));
+ ConfigFactory.parseMap (ImmutableMap.of("core.attachment_store_directory", path.getAbsolutePath())));
}
@Override
diff --git a/wave/src/test/java/org/waveprotocol/box/server/persistence/file/CertPathStoreTest.java b/wave/src/test/java/org/waveprotocol/box/server/persistence/file/CertPathStoreTest.java
index 0065d80..c10a774 100644
--- a/wave/src/test/java/org/waveprotocol/box/server/persistence/file/CertPathStoreTest.java
+++ b/wave/src/test/java/org/waveprotocol/box/server/persistence/file/CertPathStoreTest.java
@@ -19,6 +19,7 @@
package org.waveprotocol.box.server.persistence.file;
+import com.google.common.collect.ImmutableMap;
import com.typesafe.config.ConfigFactory;
import org.waveprotocol.box.server.persistence.CertPathStoreTestBase;
import org.waveprotocol.wave.crypto.CertPathStore;
@@ -46,7 +47,8 @@
@Override
protected CertPathStore newCertPathStore() {
- return new FileSignerInfoStore(ConfigFactory.parseString("core.signer_info_store_directory : " + path.getAbsolutePath()));
+ return new FileSignerInfoStore(ConfigFactory.parseMap (
+ ImmutableMap.of("core.signer_info_store_directory", path.getAbsolutePath())));
}
@Override
diff --git a/wave/src/test/java/org/waveprotocol/box/server/persistence/file/DeltaStoreTest.java b/wave/src/test/java/org/waveprotocol/box/server/persistence/file/DeltaStoreTest.java
index 1bc18ee..3dbfaef 100644
--- a/wave/src/test/java/org/waveprotocol/box/server/persistence/file/DeltaStoreTest.java
+++ b/wave/src/test/java/org/waveprotocol/box/server/persistence/file/DeltaStoreTest.java
@@ -21,6 +21,7 @@
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.typesafe.config.ConfigFactory;
import org.waveprotocol.box.server.persistence.DeltaStoreTestBase;
import org.waveprotocol.box.server.waveserver.DeltaStore;
@@ -55,8 +56,8 @@
@Override
protected DeltaStore newDeltaStore() {
- return new FileDeltaStore(ConfigFactory.parseString("core.delta_store_directory : " + path
- .getAbsolutePath()));
+ return new FileDeltaStore(ConfigFactory.parseMap (
+ ImmutableMap.of("core.delta_store_directory", path.getAbsolutePath())));
}
@Override
diff --git a/wave/src/test/java/org/waveprotocol/wave/federation/xmpp/MockDisco.java b/wave/src/test/java/org/waveprotocol/wave/federation/xmpp/MockDisco.java
deleted file mode 100644
index 5824cc8..0000000
--- a/wave/src/test/java/org/waveprotocol/wave/federation/xmpp/MockDisco.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.waveprotocol.wave.federation.xmpp;
-
-import com.google.common.base.Function;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.MapMaker;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Tiny MockDisco class that wraps XmppDisco.
- *
- * Use {@link #testInjectInDomainToJidMap} to configure custom immediate responses, otherwise
- * responses will be placed on a pending queue.
- *
- * @author thorogood@google.com (Sam Thorogood)
- */
-public class MockDisco extends XmppDisco {
-
- private static final int FAIL_EXPIRY_SECS = 5 * 60;
- private static final int SUCCESS_EXPIRY_SECS = 2 * 60 * 60;
- private static final int DISCO_EXPIRY_HOURS = 6;
-
- public static final Config config;
-
- static {
- Map<String, Object> props = new HashMap<>();
- props.put("federation.xmpp_server_description", "Wave in a Box");
- props.put("federation.disco_info_category", "collaboration");
- props.put("federation.disco_info_type", "apache-wave");
- props.put("federation.xmpp_disco_failed_expiry", FAIL_EXPIRY_SECS + "s");
- props.put("federation.xmpp_disco_successful_expiry", SUCCESS_EXPIRY_SECS + "s");
- props.put("federation.disco_expiration", DISCO_EXPIRY_HOURS + "h");
-
- config = ConfigFactory.parseMap(props);
- }
-
- MockDisco() {
- super(config);
- }
-
- public static class PendingMockDisco {
- public final String remoteDomain;
- public final Queue<SuccessFailCallback<String, String>> callbacks = Lists.newLinkedList();
-
- private PendingMockDisco(String remoteDomain) {
- this.remoteDomain = remoteDomain;
- }
-
- private void addCallback(SuccessFailCallback<String, String> callback) {
- callbacks.add(callback);
- }
- }
-
- public LoadingCache<String, PendingMockDisco> pending = CacheBuilder.newBuilder()
- .build(new CacheLoader<String, PendingMockDisco>() {
- @Override
- public PendingMockDisco load(String domain) {
- return new PendingMockDisco(domain);
- }
- });
-
- @Override
- public void discoverRemoteJid(String remoteDomain, SuccessFailCallback<String, String> callback) {
- if (isDiscoRequestAvailable(remoteDomain)) {
- // Note: tiny race condition in case this is purged between above and
- // below, but since this is only used in tests, we can probably ignore it.
- super.discoverRemoteJid(remoteDomain, callback);
- } else {
- try {
- pending.get(remoteDomain).addCallback(callback);
- } catch (ExecutionException ex) {
- throw new RuntimeException(ex);
- }
- }
- }
-
-}
diff --git a/wave/src/test/java/org/waveprotocol/wave/federation/xmpp/MockOutgoingPacketTransport.java b/wave/src/test/java/org/waveprotocol/wave/federation/xmpp/MockOutgoingPacketTransport.java
deleted file mode 100644
index 588fbe8..0000000
--- a/wave/src/test/java/org/waveprotocol/wave/federation/xmpp/MockOutgoingPacketTransport.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.waveprotocol.wave.federation.xmpp;
-
-import org.xmpp.packet.Packet;
-
-import java.util.LinkedList;
-import java.util.Queue;
-
-/**
- * Dummy implementation of {@link OutgoingPacketTransport} that stores packets
- * being sent over-the-wire. May optionally accept a {@link Router} instance
- * where packets are automatically forwarded.
- *
- * @author thorogood@google.com (Sam Thorogood)
- */
-public class MockOutgoingPacketTransport implements OutgoingPacketTransport {
-
- public interface Router {
- public void route(Packet packet);
- }
-
- // wrapped router object, if null then packets are not routed
- public Router router;
-
- // pending outgoing packets
- public final Queue<Packet> packets = new LinkedList<Packet>();
-
- // last packet sent
- public Packet lastPacketSent = null;
-
- // total number of packets sent here
- public long packetsSent = 0;
-
- public MockOutgoingPacketTransport() {
- router = null;
- }
-
- public MockOutgoingPacketTransport(Router router) {
- this.router = router;
- }
-
- @Override
- public void sendPacket(Packet packet) {
- if (!packets.offer(packet)) {
- throw new IllegalStateException("Can't offer packet to queue: " + packets);
- }
- lastPacketSent = packet;
- packetsSent++;
-
- if (router != null) {
- router.route(packet);
- }
- }
-
-}
diff --git a/wave/src/test/java/org/waveprotocol/wave/federation/xmpp/RemoteDiscoTest.java b/wave/src/test/java/org/waveprotocol/wave/federation/xmpp/RemoteDiscoTest.java
deleted file mode 100644
index 2668a74..0000000
--- a/wave/src/test/java/org/waveprotocol/wave/federation/xmpp/RemoteDiscoTest.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.waveprotocol.wave.federation.xmpp;
-
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import org.xmpp.packet.IQ;
-import org.joda.time.DateTimeUtils;
-import org.waveprotocol.wave.federation.FederationErrors;
-
-
-import junit.framework.TestCase;
-
-/**
- * Performs naive tests over RemoteDisco. Integration testing is performed in
- * {@link XmppDiscoTest}.
- *
- * @author thorogood@google.com (Sam Thorogood)
- */
-
-public class RemoteDiscoTest extends TestCase {
-
- private final static String REMOTE_DOMAIN = "acmewave.com";
- private final static String REMOTE_JID = "wave.acmewave.com";
- private static final int SUCCESS_EXPIRY_SECS = 600;
- private static final int FAIL_EXPIRY_SECS = 120;
- private RemoteDisco remoteDisco;
- private SuccessFailCallback<String, String> callback;
-
- protected void setUp() throws Exception {
- super.setUp();
- XmppManager manager = mock(XmppManager.class);
- when(manager.createRequestIQ(eq(REMOTE_DOMAIN))).thenReturn(new IQ());
-
- DateTimeUtils.setCurrentMillisFixed(0);
- remoteDisco = new RemoteDisco(manager, REMOTE_DOMAIN, FAIL_EXPIRY_SECS,
- SUCCESS_EXPIRY_SECS);
- callback = mockDiscoCallback();
- }
-
- @Override
- protected void tearDown() throws Exception {
- super.tearDown();
- DateTimeUtils.setCurrentMillisSystem();
- }
-
- @SuppressWarnings("unchecked")
- private SuccessFailCallback<String, String> mockDiscoCallback() {
- return mock(SuccessFailCallback.class);
- }
-
- /**
- * Test a RemoteDisco created with a forced success case.
- */
- public void testForcedSuccess() {
- RemoteDisco remoteDisco = new RemoteDisco(REMOTE_DOMAIN, REMOTE_JID, null);
-
- SuccessFailCallback<String, String> callback = mockDiscoCallback();
- remoteDisco.discoverRemoteJID(callback);
- verify(callback).onSuccess(eq(REMOTE_JID));
- verify(callback, never()).onFailure(anyString());
- }
-
- /**
- * Test a RemoteDisco created with a forced failure case.
- */
- public void testForcedFailure() {
- RemoteDisco remoteDisco = new RemoteDisco(REMOTE_DOMAIN, null,
- FederationErrors.badRequest("irrelevant"));
-
- SuccessFailCallback<String, String> callback = mockDiscoCallback();
- remoteDisco.discoverRemoteJID(callback);
- verify(callback, never()).onSuccess(anyString());
- verify(callback).onFailure(anyString());
- callback = mockDiscoCallback();
- remoteDisco.discoverRemoteJID(callback);
- verify(callback, never()).onSuccess(anyString());
- verify(callback).onFailure(anyString());
- }
-
- /**
- * Tests the disco expiry code for successful disco results.
- */
- public void testTimeToLive() {
- remoteDisco.discoverRemoteJID(callback);
- assertFalse(remoteDisco.ttlExceeded());
- remoteDisco.finish(REMOTE_JID, null); // successful disco
- assertFalse(remoteDisco.ttlExceeded());
- tick((SUCCESS_EXPIRY_SECS - 1) * 1000); // not quite expired
- assertFalse(remoteDisco.ttlExceeded());
- tick(20 * 1000); // should now be expired
- assertTrue(remoteDisco.ttlExceeded());
- }
-
- /**
- * Tests the disco expiry code for failed disco results.
- */
- public void testTimeToLiveDiscoFailed() {
- remoteDisco.discoverRemoteJID(callback);
- assertFalse(remoteDisco.ttlExceeded());
- remoteDisco.finish(null, FederationErrors.badRequest("test failure")); // failed disco
- assertFalse(remoteDisco.ttlExceeded());
- tick((FAIL_EXPIRY_SECS - 1) * 1000); // not quite expired
- assertFalse(remoteDisco.ttlExceeded());
- tick(20 * 1000); // should now be expired
- assertTrue(remoteDisco.ttlExceeded());
- }
-
- /**
- * Advance the clock.
- *
- * @param millis milliseconds to advance clock
- */
- private void tick(int millis) {
- DateTimeUtils.setCurrentMillisFixed(DateTimeUtils.currentTimeMillis() + millis);
- }
-}
diff --git a/wave/src/test/java/org/waveprotocol/wave/federation/xmpp/RoundTripTest.java b/wave/src/test/java/org/waveprotocol/wave/federation/xmpp/RoundTripTest.java
deleted file mode 100644
index fc928a6..0000000
--- a/wave/src/test/java/org/waveprotocol/wave/federation/xmpp/RoundTripTest.java
+++ /dev/null
@@ -1,378 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.waveprotocol.wave.federation.xmpp;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.reset;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyZeroInteractions;
-
-
-import com.typesafe.config.ConfigFactory;
-import junit.framework.TestCase;
-
-import org.mockito.ArgumentCaptor;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.xmpp.packet.IQ;
-import org.xmpp.packet.Message;
-import org.xmpp.packet.Packet;
-import org.xmpp.packet.PacketError;
-import org.waveprotocol.wave.federation.FederationErrorProto.FederationError;
-import org.waveprotocol.wave.federation.FederationErrors;
-import org.waveprotocol.wave.federation.xmpp.MockOutgoingPacketTransport.Router;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Test round-trips between two XmppManager instances pointed at each other.
- *
- * This class is not intended to test specific calls; it is primary to test
- * reliable calls made by the manager along with error handling. Any specific
- * call coverage is purely a side-effect of wanting real test data.
- *
- * @author thorogood@google.com (Sam Thorogood)
- */
-
-public class RoundTripTest extends TestCase {
-
- private static final String SERVER1_DOMAIN = "google.com";
- private static final String SERVER2_DOMAIN = "acmewave.com";
-
- private static final int PACKET_TIMEOUT = 10;
-
- private static class ServerInstances {
- final String jid;
- final XmppManager manager;
- final XmppFederationHost host;
- final XmppFederationRemote remote;
- final XmppDisco disco;
- final MockOutgoingPacketTransport transport;
-
- ServerInstances(String domain, MockOutgoingPacketTransport.Router router) {
- // Mocks.
- host = mock(XmppFederationHost.class);
- remote = mock(XmppFederationRemote.class);
- disco = mock(XmppDisco.class);
-
- // 'Real' instantiated classes!
- jid = "wave." + domain;
- transport = new MockOutgoingPacketTransport(router);
-
- final Map<String, Object> props = new HashMap<>();
- props.put("federation.xmpp_disco_successful_expiry", "6s");
- props.put("federation.xmpp_jid", jid);
- manager = new XmppManager(
- host, remote, disco, transport, ConfigFactory.parseMap(props));
-
- // Verify manager callback.
- verify(host).setManager(eq(manager));
- verify(remote).setManager(eq(manager));
- verify(disco).setManager(eq(manager));
- }
- }
-
- private ServerInstances server1;
- private ServerInstances server2;
-
- @Override
- public void setUp() throws Exception {
- super.setUp();
-
- server1 = new ServerInstances(SERVER1_DOMAIN, new Router() {
- @Override
- public void route(Packet packet) {
- server2.manager.receivePacket(packet);
- }
- });
- server2 = new ServerInstances(SERVER2_DOMAIN, new Router() {
- @Override
- public void route(Packet packet) {
- server1.manager.receivePacket(packet);
- }
- });
- }
-
- /**
- * Test the simple case of packet send/receive by sending a malformed request.
- */
- public void testPacketSendMalformedFailure() {
- Packet packet = new IQ();
- packet.setFrom(server1.jid);
- packet.setID("irrelevant");
- packet.setTo(server2.jid);
-
- PacketCallback callback = mock(PacketCallback.class);
-
- // Send an outgoing packet from server1 -> server2
- server1.manager.send(packet, callback, PACKET_TIMEOUT);
- assertEquals("First transport should have a single packet pending",
- 1, server1.transport.packets.size());
- assertEquals("First transport should have unmodified outgoing packet",
- packet, server1.transport.packets.peek());
-
- // Confirm that server2 sent back an error
- assertEquals("Second transport should have a single packet pending",
- 1, server2.transport.packets.size());
- assertNotNull("Second transport should be an error packet",
- server2.transport.packets.peek().getError());
-
- // Ensure the error is interpreted correctly and returned to the callback
- ArgumentCaptor<FederationError> errorCaptor = ArgumentCaptor.forClass(FederationError.class);
- verify(callback).error(errorCaptor.capture());
- verify(callback, never()).run(any(Packet.class));
- assertEquals("Invalid packet was sent, error should be BAD_REQUEST",
- FederationError.Code.BAD_REQUEST, errorCaptor.getValue().getErrorCode());
- }
-
- /**
- * Test the simple case of having a response invoked based entirely on the
- * timeout case.
- */
- public void testPacketTimeout() throws Exception {
- int TIMEOUT_DELAY = 0;
- int TIMEOUT_WAIT = 5;
-
- // Send a valid packet, so it is received by the remote Disco mock, but not processed.
- IQ packet = new IQ();
- packet.setFrom(server1.jid);
- packet.setID("disco");
- packet.setTo(server2.jid);
- packet.setType(IQ.Type.get);
- packet.setChildElement("query", XmppNamespace.NAMESPACE_DISCO_ITEMS);
-
- PacketCallback callback = mock(PacketCallback.class);
- final CountDownLatch finished = new CountDownLatch(1);
-
- Mockito.doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocation) throws Throwable {
- FederationError error = (FederationError) invocation.getArguments()[0];
- assertEquals(FederationError.Code.REMOTE_SERVER_TIMEOUT, error.getErrorCode());
- finished.countDown();
- return null;
- }
- }).when(callback).error(any(FederationError.class));
-
- server1.manager.send(packet, callback, TIMEOUT_DELAY);
- assertTrue(finished.await(TIMEOUT_WAIT, TimeUnit.SECONDS));
- verify(callback, never()).run(any(Packet.class));
-
- // Mockito says never to reset a mock, but we have to as the callback is
- // already used deep in XmppManager.
- Mockito.reset(callback);
-
- // For fun, process the request by the remote disco and return a response
- // that will never be processed.
- ArgumentCaptor<PacketCallback> server2Callback = ArgumentCaptor.forClass(PacketCallback.class);
- verify(server2.disco).processDiscoItemsGet(eq(packet), server2Callback.capture());
- XmppDisco realDisco = new XmppDisco(MockDisco.config);
- realDisco.setManager(server2.manager);
- realDisco.processDiscoItemsGet(packet, server2Callback.getValue());
-
- // Confirm disco on server2 has replied with a packet.
- assertEquals(1, server2.transport.packets.size());
- assertEquals(null, server2.transport.packets.peek().getError());
-
- // Confirm, however, that the packet is dropped by the first manager (no pending call!).
- verifyZeroInteractions(callback);
- }
-
- /**
- * Test that an arbitrary error response is properly returned when generated
- * by the second server. Also ensure that the second server can't invoke its
- * callback twice.
- */
- public void testErrorResponse() {
- FederationError.Code TEST_CODE = FederationError.Code.NOT_AUTHORIZED;
- PacketError.Condition TEST_CONDITION = PacketError.Condition.not_authorized;
-
- // Send a valid packet, so it is received by the remote Disco mock, but not
- // explicitly processed.
- IQ packet = new IQ();
- packet.setFrom(server1.jid);
- packet.setID("disco");
- packet.setTo(server2.jid);
- packet.setType(IQ.Type.get);
- packet.setChildElement("query", XmppNamespace.NAMESPACE_DISCO_ITEMS);
-
- PacketCallback callback = mock(PacketCallback.class);
- server1.manager.send(packet, callback, PACKET_TIMEOUT);
-
- // Accept the disco request and return TEST_CODE error.
- ArgumentCaptor<PacketCallback> server2Callback = ArgumentCaptor.forClass(PacketCallback.class);
- verify(server2.disco).processDiscoItemsGet(eq(packet), server2Callback.capture());
- server2Callback.getValue().error(FederationErrors.newFederationError(TEST_CODE));
-
- // Try to then complete the message, but cause an IllegalStateException.
- IQ fakeResponse = IQ.createResultIQ(packet);
- try {
- server2Callback.getValue().run(fakeResponse);
- fail("Should not be able to invoke callback twice");
- } catch (IllegalStateException e) {
- // pass
- }
-
- // Check the outgoing packet log.
- assertEquals(1, server2.transport.packets.size());
- Packet errorResponse = server2.transport.packets.peek();
- PacketError error = errorResponse.getError();
- assertNotNull(error);
- assertEquals(TEST_CONDITION, error.getCondition());
-
- // Assert that the error response does *not* include the original packet.
- assertTrue(errorResponse instanceof IQ);
- IQ errorIQ = (IQ) errorResponse;
- assertEquals(null, errorIQ.getChildElement());
-
- // Confirm that the error is received properly on the first server.
- ArgumentCaptor<FederationError> returnedError = ArgumentCaptor.forClass(FederationError.class);
- verify(callback).error(returnedError.capture());
- verify(callback, never()).run(any(Packet.class));
- assertEquals(TEST_CODE, returnedError.getValue().getErrorCode());
-
- // If we push the error again, it should be dropped. Note that resetting the
- // callback here is the simplest way to test this, since it is already
- // registered inside the manager.
- reset(callback);
- server1.manager.receivePacket(errorResponse);
- verifyZeroInteractions(callback);
- }
-
- /**
- * Test that an unhandled error (e.g. <forbidden>) is translated to
- * UNDEFINED_CONDITION before being returned to the mocked callback.
- */
- public void testUnhandledErrorResponse() {
- IQ packet = new IQ();
- packet.setFrom(server1.jid);
- packet.setID("foo");
- packet.setTo(server2.jid);
-
- // Disable routing so we can intercept the packet.
- server1.transport.router = null;
- PacketCallback callback = mock(PacketCallback.class);
- server1.manager.send(packet, callback, PACKET_TIMEOUT);
-
- // Generate an explicit error <forbidden>.
- IQ errorPacket = IQ.createResultIQ(packet);
- errorPacket.setError(PacketError.Condition.forbidden);
- server1.manager.receivePacket(errorPacket);
-
- // Confirm that <forbidden> is transformed to UNDEFINED_CONDITION.
- ArgumentCaptor<FederationError> returnedError = ArgumentCaptor.forClass(FederationError.class);
- verify(callback).error(returnedError.capture());
- verify(callback, never()).run(any(Packet.class));
- assertEquals(FederationError.Code.UNDEFINED_CONDITION, returnedError.getValue().getErrorCode());
- }
-
- /**
- * Test that packet IDs cannot be re-used while in-flight, and also that may
- * be re-used later.
- */
- public void testReusePacketId() throws Exception {
- int REUSE_FAIL_WAIT = 5;
-
- IQ packet = new IQ();
- packet.setFrom(server1.jid);
- packet.setID("foo-packet");
- packet.setTo(server2.jid);
-
- // Disable routing so we can intercept the packet.
- server1.transport.router = null;
- PacketCallback callback = mock(PacketCallback.class);
- server1.manager.send(packet, callback, PACKET_TIMEOUT);
- assertEquals(1, server1.transport.packets.size());
- assertEquals(packet, server1.transport.packets.poll());
-
- // Try sending another packet with the same ID - must fail (called back in
- // another thread)!
- PacketCallback invalidCallback = mock(PacketCallback.class);
- final CountDownLatch finished = new CountDownLatch(1);
- Mockito.doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocation) throws Throwable {
- FederationError error = (FederationError) invocation.getArguments()[0];
- assertEquals(FederationError.Code.UNDEFINED_CONDITION, error.getErrorCode());
- finished.countDown();
- return null;
- }
- }).when(invalidCallback).error(any(FederationError.class));
-
- server1.manager.send(packet, invalidCallback, PACKET_TIMEOUT);
- assertTrue(finished.await(REUSE_FAIL_WAIT, TimeUnit.SECONDS));
- verify(invalidCallback, never()).run(any(Packet.class));
-
- // Generate an explicit success response.
- IQ successPacket = IQ.createResultIQ(packet);
- server1.manager.receivePacket(successPacket);
- verify(callback).run(eq(successPacket));
- verify(callback, never()).error(any(FederationError.class));
-
- // Again, re-use the ID: should succeed since it is cleared from callbacks.
- PacketCallback zeroCallback = mock(PacketCallback.class);
- server1.manager.send(packet, zeroCallback, PACKET_TIMEOUT);
- assertEquals(1, server1.transport.packets.size());
- assertEquals(packet, server1.transport.packets.poll());
- verifyZeroInteractions(zeroCallback);
- }
-
- /**
- * Test that if (e.g.) an IQ is sent, then an IQ must be returned as a
- * response. If a Message is returned instead, this should invoke an error
- * callback.
- */
- public void testDropInvalidResponseType() throws Exception {
- IQ packet = server1.manager.createRequestIQ(server2.jid);
-
- // Disable routing so we can intercept the packet.
- server1.transport.router = null;
- PacketCallback callback = mock(PacketCallback.class);
- server1.manager.send(packet, callback, PACKET_TIMEOUT);
-
- // Generate an explicit Message receipt.
- Message response = new Message();
- response.setTo(packet.getFrom());
- response.setID(packet.getID());
- response.setFrom(packet.getTo());
- response.addChildElement("received", XmppNamespace.NAMESPACE_XMPP_RECEIPTS);
- server1.manager.receivePacket(response);
-
- // Confirm that an error callback is invoked.
- ArgumentCaptor<FederationError> returnedError = ArgumentCaptor.forClass(FederationError.class);
- verify(callback).error(returnedError.capture());
- verify(callback, never()).run(any(Packet.class));
- assertEquals(FederationError.Code.UNDEFINED_CONDITION, returnedError.getValue().getErrorCode());
-
- // Confirm that sending a correct response now does nothing.
- reset(callback);
- IQ correctResponse = IQ.createResultIQ(packet);
- server1.manager.receivePacket(correctResponse);
- verifyZeroInteractions(callback);
- }
-
-}
diff --git a/wave/src/test/java/org/waveprotocol/wave/federation/xmpp/XmppDiscoTest.java b/wave/src/test/java/org/waveprotocol/wave/federation/xmpp/XmppDiscoTest.java
deleted file mode 100644
index ef42b67..0000000
--- a/wave/src/test/java/org/waveprotocol/wave/federation/xmpp/XmppDiscoTest.java
+++ /dev/null
@@ -1,674 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.waveprotocol.wave.federation.xmpp;
-
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyZeroInteractions;
-
-import com.google.common.collect.Lists;
-
-import com.typesafe.config.ConfigFactory;
-import junit.framework.TestCase;
-
-import org.dom4j.Element;
-import org.xmpp.packet.IQ;
-import org.xmpp.packet.Packet;
-import org.xmpp.packet.PacketError;
-import org.joda.time.DateTimeUtils;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * Tests for the {@link XmppDisco} class. Also provides coverage over
- * {@link RemoteDisco} which is used internally by XmppDisco.
- */
-
-
-public class XmppDiscoTest extends TestCase {
- private static final String LOCAL_DOMAIN = "something.com";
- private static final String LOCAL_JID = "wave." + LOCAL_DOMAIN;
- private static final String REMOTE_DOMAIN = "other.com";
- private static final String REMOTE_JID = "wave." + REMOTE_DOMAIN;
-
- private static final String DISCO_ITEMS_ID = "disco-items";
- private static final String DISCO_INFO_ID = "disco-info";
- private static final String SERVER_DESCRIPTION = "Wave in a Box";
-
- // The following JID is intentionally non-Wave.
- private static final String REMOTE_PUBSUB_JID = "pubsub." + REMOTE_DOMAIN;
-
- private static final String EXPECTED_DISCO_ITEMS_GET =
- "\n<iq type=\"get\" id=\"" + DISCO_ITEMS_ID + "\" to=\"" + REMOTE_DOMAIN + "\" "
- + "from=\"" + LOCAL_JID + "\">\n"
- + " <query xmlns=\"http://jabber.org/protocol/disco#items\"/>\n"
- + "</iq>";
-
- private static final String EXPECTED_DISCO_INFO_GET =
- "\n<iq type=\"get\" id=\"" + DISCO_INFO_ID + "\" to=\"" + REMOTE_JID + "\" "
- + "from=\"" + LOCAL_JID + "\">\n"
- + " <query xmlns=\"http://jabber.org/protocol/disco#info\"/>\n"
- + "</iq>";
-
- private static final String EXPECTED_DISCO_INFO_GET_PUBSUB =
- "\n<iq type=\"get\" id=\"" + DISCO_INFO_ID + "\" to=\"" + REMOTE_PUBSUB_JID + "\" "
- + "from=\"" + LOCAL_JID + "\">\n"
- + " <query xmlns=\"http://jabber.org/protocol/disco#info\"/>\n"
- + "</iq>";
-
- private static final String EXPECTED_DISCO_ITEMS_RESULT =
- "\n<iq type=\"result\" id=\"" + DISCO_ITEMS_ID + "\" from=\"" + LOCAL_JID + "\" "
- + "to=\"" + REMOTE_JID + "\">\n"
- + " <query xmlns=\"http://jabber.org/protocol/disco#items\"/>\n"
- + "</iq>";
-
- private static final String EXPECTED_DISCO_INFO_RESULT =
- "\n<iq type=\"result\" id=\""+ DISCO_INFO_ID + "\" from=\"" + LOCAL_JID + "\" "
- + "to=\"" + REMOTE_JID + "\">\n"
- + " <query xmlns=\"http://jabber.org/protocol/disco#info\">\n"
- + " <identity category=\"collaboration\" type=\"apache-wave\" "
- + "name=\"" + SERVER_DESCRIPTION + "\"/>\n"
- + " <feature var=\"http://waveprotocol.org/protocol/0.2/waveserver\"/>\n"
- + " </query>\n"
- + "</iq>";
-
- private MockOutgoingPacketTransport transport;
- private XmppManager manager;
- private XmppDisco disco;
-
- // Explicitly mocked out disco callback usable by individual tests.
- private SuccessFailCallback<String, String> discoCallback;
- private static final int DISCO_FAIL_EXPIRY_SECS = 5 * 60;
- private static final int DISCO_SUCCESS_EXPIRY_SECS = 2 * 60 * 60;
-
- private final AtomicLong counterStarted;
- private final AtomicLong counterSuccess;
- private final AtomicLong counterFailed;
-
- public XmppDiscoTest() throws ExecutionException {
- counterStarted = XmppDisco.statDiscoStarted.get(REMOTE_DOMAIN);
- counterSuccess = RemoteDisco.statDiscoSuccess.get(REMOTE_DOMAIN);
- counterFailed = RemoteDisco.statDiscoFailed.get(REMOTE_DOMAIN);
- }
-
- @Override
- protected void setUp() throws Exception {
- super.setUp();
- disco = new XmppDisco(MockDisco.config);
- transport = new MockOutgoingPacketTransport();
- final Map<String, Object> props = new HashMap<>();
- props.put("federation.xmpp_jid", LOCAL_JID);
- manager = new XmppManager(mock(XmppFederationHost.class), mock(XmppFederationRemote.class),
- disco, transport, ConfigFactory.parseMap(props).withFallback(MockDisco.config));
- disco.setManager(manager);
- discoCallback = createMockCallback();
-
- resetVarz();
- }
-
- @Override
- protected void tearDown() throws Exception {
- super.tearDown();
- DateTimeUtils.setCurrentMillisSystem();
- }
-
- /**
- * Tests that starting disco sends a disco#items to the remote server.
- */
- public void testDiscoStart() {
- XmppUtil.fakeUniqueId = DISCO_ITEMS_ID;
- disco.discoverRemoteJid(REMOTE_DOMAIN, discoCallback);
- assertEquals(1, transport.packets.size());
- Packet packet = transport.packets.poll();
- assertEquals(REMOTE_DOMAIN, packet.getTo().toString());
- assertEquals(LOCAL_JID, packet.getFrom().toString());
- assertEquals(EXPECTED_DISCO_ITEMS_GET, packet.toString());
- checkAndResetStats(1, 0, 0); // started
- }
-
- /**
- * Tests that starting disco sends a disco#items to the remote server, and subsequent
- * disco requests are not sent until there is a retransmit timeout. Also test that the callback
- * is run even after timing out.
- */
- public void testDiscoRetransmitsOnNoReply() {
- int expectedFailures = 0;
- int expectedPackets = 0;
-
- disco.discoverRemoteJid(REMOTE_DOMAIN, discoCallback);
- checkAndResetStats(1, 0, 0); // started
-
- expectedFailures++;
- expectedPackets++;
- assertEquals("Should have sent disco packet", expectedPackets, transport.packetsSent);
-
- for (int i = 1; i < RemoteDisco.MAXIMUM_DISCO_ATTEMPTS; i++) {
- manager.causeImmediateTimeout(transport.packets.remove());
- expectedPackets++;
- assertEquals("Should have retried", expectedPackets, transport.packetsSent);
-
- disco.discoverRemoteJid(REMOTE_DOMAIN, discoCallback);
- disco.discoverRemoteJid(REMOTE_DOMAIN, discoCallback);
- expectedFailures += 2;
- assertEquals("Should not have sent more outgoing packets",
- expectedPackets, transport.packetsSent);
-
- // Should be no activity on the callback
- verifyZeroInteractions(discoCallback);
- }
-
- // This final timeout should cause all callbacks to be invoked.
- manager.causeImmediateTimeout(transport.packets.remove());
- verify(discoCallback, times(expectedFailures)).onFailure(anyString());
- verify(discoCallback, never()).onSuccess(anyString());
- checkAndResetStats(0, 0, 1); // failed
-
- // The next request should return a cached response.
- SuccessFailCallback<String, String> cachedDiscoCallback = createMockCallback();
- disco.discoverRemoteJid(REMOTE_DOMAIN, cachedDiscoCallback);
- verify(cachedDiscoCallback).onFailure(anyString());
- verify(cachedDiscoCallback, never()).onSuccess(anyString());
-
- // No more outgoing packets.
- assertEquals("Should not have sent more outgoing packets",
- expectedPackets, transport.packetsSent);
- checkAndResetStats(0, 0, 0); // no additional varz
- }
-
- /**
- * Tests that starting disco sends a disco#items to the remote server, and no
- * subsequent disco requests start after we get a successful reply.
- */
- public void testDiscoNoRetransmitsAfterReply() throws ExecutionException {
- XmppUtil.fakeUniqueId = DISCO_ITEMS_ID;
- disco.discoverRemoteJid(REMOTE_DOMAIN, discoCallback);
- checkAndResetStats(1, 0, 0); // started
- assertEquals("Expected disco packet to be sent", 1, transport.packetsSent);
- Packet packet = transport.lastPacketSent;
- assertEquals(EXPECTED_DISCO_ITEMS_GET, packet.toString());
- assertTrue(disco.isDiscoRequestPending(REMOTE_DOMAIN));
-
- IQ discoItemsResult = createDiscoItems(true /* wave */, false /* not pubsub */);
- discoItemsResult.setID(packet.getID());
- XmppUtil.fakeUniqueId = DISCO_INFO_ID;
- manager.receivePacket(discoItemsResult);
- assertEquals("Expected disco info get to be sent", 2, transport.packetsSent);
- assertEquals(EXPECTED_DISCO_INFO_GET, transport.lastPacketSent.toString());
-
- // Check that we haven't yet finished - we should only get up to sending the items request.
- verifyZeroInteractions(discoCallback);
- assertTrue(disco.isDiscoRequestPending(REMOTE_DOMAIN));
- checkAndResetStats(0, 0, 0); // no additional varz
- }
-
- /**
- * Tests stage 2 of disco. Inject a disco#items into the disco code, check it
- * calls disco#info on the JID.
- */
- public void testDiscoItemsResult() {
- initiateDiscoRequest(); // sends one packet.
- checkAndResetStats(1, 0, 0); // started
- // create with wave, no pubsub
- IQ discoItemsResult = createDiscoItems(true /* wave */, false /* not pubsub */);
-
- XmppUtil.fakeUniqueId = DISCO_INFO_ID;
- manager.receivePacket(discoItemsResult);
- assertEquals(2, transport.packetsSent);
- Packet packet = transport.lastPacketSent;
- assertEquals(REMOTE_JID, packet.getTo().toString());
- assertEquals(LOCAL_JID, packet.getFrom().toString());
- assertEquals(EXPECTED_DISCO_INFO_GET, packet.toString());
- checkAndResetStats(0, 0, 0); // no additional varz
- }
-
- /**
- * Tests stage 3 of disco. Inject a disco#info into the disco code (one that
- * matches wave) and check the callback gets run.
- */
- public void testDiscoInfoResultWave() {
- initiateDiscoRequest(); // sends one packet.
- checkAndResetStats(1, 0, 0); // started
- // create with wave, no pubsub
- IQ discoItemsResult = createDiscoItems(true /* wave */, false /* not pubsub */);
- // Start the process.
- XmppUtil.fakeUniqueId = DISCO_INFO_ID;
- manager.receivePacket(discoItemsResult);
- assertEquals(2, transport.packetsSent);
- Packet packet = transport.lastPacketSent;
- assertEquals(EXPECTED_DISCO_INFO_GET, packet.toString());
- // create a wave disco result, inject into disco.
- manager.receivePacket(createDiscoInfo(true /* wave */));
- assertEquals(2, transport.packetsSent);
- verify(discoCallback).onSuccess(eq(REMOTE_JID));
- checkAndResetStats(0, 1, 0); // success
- }
-
- /**
- * Tests stage 3 of disco. Inject a disco#info into the disco code (one that
- * doesn't match wave) and check callback gets run with null.
- */
- public void testDiscoInfoResultPubsub() {
- initiateDiscoRequest(); // sends one packet.
- checkAndResetStats(1, 0, 0); // started
- transport.packets.remove(); // remove packet from queue
-
- // create with just pubsub
- IQ discoItemsResult = createDiscoItems(false /* not wave */, true /* pubsub */);
- XmppUtil.fakeUniqueId = DISCO_INFO_ID;
- manager.receivePacket(discoItemsResult);
- assertEquals(3, transport.packetsSent);
-
- // Expect a wave request even if we didn't send it (automatic wave request)
- Packet wavePacket = transport.packets.poll();
- assertEquals(EXPECTED_DISCO_INFO_GET, wavePacket.toString());
-
- // Expect pubsub packet
- Packet pubsubPacket = transport.packets.poll();
- assertEquals(EXPECTED_DISCO_INFO_GET_PUBSUB, pubsubPacket.toString());
-
- // Create pubsub response, should not yet invoke callback
- manager.receivePacket(createDiscoInfo(false /* not wave */));
- verifyZeroInteractions(discoCallback);
-
- // Create response to wave request, with ITEM_NOT_FOUND
- IQ failWaveResponse = IQ.createResultIQ((IQ) wavePacket);
- failWaveResponse.setError(PacketError.Condition.item_not_found);
- manager.receivePacket(failWaveResponse);
- verify(discoCallback).onFailure(anyString());
- checkAndResetStats(0, 0, 1); // failed
-
- // No more outgoing packets
- assertEquals(3, transport.packetsSent);
- }
-
- /**
- * Tests stage 3 of disco. Inject a disco#items into the disco code with
- * pubsub, then wave. Then give it pubsub's disco#info, and check it then
- * sends a disco#info for wave.
- */
- public void testDiscoInfoResultPubsubAndWave() {
- initiateDiscoRequest(); // sends one packet.
- checkAndResetStats(1, 0, 0); // started
-
- transport.packets.remove(); // remove packet from queue
-
- // create with both pubsub and wave
- IQ discoItemsResult = createDiscoItems(true /* wave */, true /* pubsub */);
- XmppUtil.fakeUniqueId = DISCO_INFO_ID;
- manager.receivePacket(discoItemsResult);
- assertEquals(3, transport.packetsSent);
-
- // Expect a wave request
- Packet wavePacket = transport.packets.poll();
- assertEquals(EXPECTED_DISCO_INFO_GET, wavePacket.toString());
-
- // Expect pubsub packet
- Packet pubsubPacket = transport.packets.poll();
- assertEquals(EXPECTED_DISCO_INFO_GET_PUBSUB, pubsubPacket.toString());
-
- // Create pubsub response, should not yet invoke callback
- manager.receivePacket(createDiscoInfo(false /* not wave */));
- verifyZeroInteractions(discoCallback);
-
- checkAndResetStats(0, 0, 0); // not finished yet
-
- // Create response to wave request, with ITEM_NOT_FOUND
- manager.receivePacket(createDiscoInfo(true /* wave */));
- verify(discoCallback).onSuccess(eq(REMOTE_JID));
-
- checkAndResetStats(0, 1, 0); // success
-
- // No more outgoing packets
- assertEquals(3, transport.packetsSent);
- }
-
- /**
- * Tests that if disco is started for a remote server for which we already
- * have the result, the cached result is just passed to the callback.
- */
- public void testDiscoStartWithCachedResult() {
- disco.testInjectInDomainToJidMap(REMOTE_DOMAIN, REMOTE_JID);
- disco.discoverRemoteJid(REMOTE_DOMAIN, discoCallback);
- assertEquals(0, transport.packetsSent);
- verify(discoCallback).onSuccess(eq(REMOTE_JID));
- checkAndResetStats(0, 0, 0); // no varz updated
- }
-
- /**
- * Tests that we return a (useless, empty) IQ for a disco#items.
- */
- public void testDiscoGetDiscoItems() {
- IQ request = createDiscoRequest(XmppNamespace.NAMESPACE_DISCO_ITEMS);
- manager.receivePacket(request);
- assertEquals(1, transport.packetsSent);
- Packet packet = transport.lastPacketSent;
- assertEquals(REMOTE_JID, packet.getTo().toString());
- assertEquals(LOCAL_JID, packet.getFrom().toString());
- assertEquals(EXPECTED_DISCO_ITEMS_RESULT, packet.toString());
- }
-
- /**
- * Tests that we return the right wave-identifying IQ for a disco#info.
- */
- public void testDiscoGetDiscoInfo() {
- IQ request = createDiscoRequest(XmppNamespace.NAMESPACE_DISCO_INFO);
- manager.receivePacket(request);
- assertEquals(1, transport.packetsSent);
- Packet packet = transport.lastPacketSent;
- assertEquals(REMOTE_JID, packet.getTo().toString());
- assertEquals(LOCAL_JID, packet.getFrom().toString());
- assertEquals(EXPECTED_DISCO_INFO_RESULT, packet.toString());
- }
-
- /**
- * Check the expiry of disco results behaves as expected when successful.
- */
- public void testDiscoCachedResultsExpiryOnSuccess() {
- DateTimeUtils.setCurrentMillisFixed(0);
- SuccessFailCallback<String, String> cb = createMockCallback();
- XmppUtil.fakeUniqueId = DISCO_ITEMS_ID;
- disco.discoverRemoteJid(REMOTE_DOMAIN, cb);
- checkAndResetStats(1, 0, 0); // started once only
- assertEquals(1, transport.packetsSent);
- XmppUtil.fakeUniqueId = DISCO_INFO_ID;
- manager.receivePacket(createDiscoItems(true /* wave */, false /* pubsub */));
- assertEquals(2, transport.packetsSent); // original items plus info
- manager.receivePacket(createDiscoInfo(true /* wave */));
- verify(cb).onSuccess(eq(REMOTE_JID));
- verify(cb, never()).onFailure(anyString());
- checkAndResetStats(0, 1, 0); // success
-
- XmppUtil.fakeUniqueId = DISCO_ITEMS_ID;
- // We shouldn't trigger disco again - we're in an OK state.
- cb = createMockCallback();
- disco.discoverRemoteJid(REMOTE_DOMAIN, cb);
- assertEquals(2, transport.packetsSent); // cached result - no more packets sent
- verify(cb).onSuccess(eq(REMOTE_JID));
- verify(cb, never()).onFailure(anyString());
- checkAndResetStats(0, 0, 0); // nothing
-
- // Time passes...
- tick((DISCO_SUCCESS_EXPIRY_SECS + 1) * 1000);
-
- cb = createMockCallback();
- disco.discoverRemoteJid(REMOTE_DOMAIN, cb);
- assertEquals(3, transport.packetsSent); // 1 more packet - disco restart
- checkAndResetStats(1, 0, 0); // started
- XmppUtil.fakeUniqueId = DISCO_INFO_ID;
- manager.receivePacket(createDiscoItems(true /* wave */, false /* pubsub */));
- assertEquals(4, transport.packetsSent); // 1 more packet - disco restart info packet
- manager.receivePacket(createDiscoInfo(true /* wave */));
- verify(cb).onSuccess(eq(REMOTE_JID));
- verify(cb, never()).onFailure(anyString());
- checkAndResetStats(0, 1, 0); // success
- }
-
- /**
- * Check the expiry of disco results behaves as expected when disco fails.
- * We send back wave and pubsub requests, identifying both as not wave. We
- * can't just send back pubsub, as the code in RemoteDisco always asks for
- * wave.foo.
- */
- public void testDiscoCachedResultsExpiryOnFailure() {
- DateTimeUtils.setCurrentMillisFixed(0);
- SuccessFailCallback<String, String> cb = createMockCallback();
- XmppUtil.fakeUniqueId = DISCO_ITEMS_ID;
- disco.discoverRemoteJid(REMOTE_DOMAIN, cb);
- assertEquals(1, transport.packetsSent);
- XmppUtil.fakeUniqueId = DISCO_INFO_ID;
- checkAndResetStats(1, 0, 0); // started
- manager.receivePacket(createDiscoItems(true /* wave */, true /* pubsub */));
- assertEquals(3, transport.packetsSent); // original items plus info
- manager.receivePacket(createDiscoInfo(false /* pubsub */));
- manager.receivePacket(createBrokenDiscoInfoForWaveJid());
- verify(cb, never()).onSuccess(anyString());
- verify(cb).onFailure(anyString());
- checkAndResetStats(0, 0, 1); // failed
-
- XmppUtil.fakeUniqueId = DISCO_ITEMS_ID;
- // We shouldn't trigger disco again - we're in a cached state.
- cb = createMockCallback();
- disco.discoverRemoteJid(REMOTE_DOMAIN, cb);
- assertEquals(3, transport.packetsSent); // cached result - no more packets sent
- verify(cb, never()).onSuccess(anyString());
- verify(cb).onFailure(anyString());
- checkAndResetStats(0, 0, 0); // nothing
-
- // Time passes...
- tick((DISCO_FAIL_EXPIRY_SECS + 1) * 1000);
-
- cb = createMockCallback();
- disco.discoverRemoteJid(REMOTE_DOMAIN, cb);
- checkAndResetStats(1, 0, 0); // started
- assertEquals(4, transport.packetsSent); // 1 more packet - disco restart
- XmppUtil.fakeUniqueId = DISCO_INFO_ID;
- manager.receivePacket(createDiscoItems(true /* wave */, true /* pubsub */));
- assertEquals(6, transport.packetsSent); // 2 more packet - disco restart info packet
- manager.receivePacket(createDiscoInfo(false /* pubsub */));
- manager.receivePacket(createBrokenDiscoInfoForWaveJid());
-
- verify(cb, never()).onSuccess(anyString());
- verify(cb).onFailure(anyString());
- checkAndResetStats(0, 0, 1); // failed
- }
-
- /**
- * Tests that if a disco items requests fails due to some error, that we still
- * perform a disco info request on fallback JIDs.
- */
- public void testDiscoItemsFallback() {
- XmppUtil.fakeUniqueId = DISCO_INFO_ID;
- disco.discoverRemoteJid(REMOTE_DOMAIN, discoCallback);
- assertEquals("Should have sent disco packet", 1, transport.packetsSent);
- checkAndResetStats(1, 0, 0); // started
-
- // Generate an error response.
- IQ errorResponse = IQ.createResultIQ((IQ) transport.packets.poll());
- errorResponse.setError(PacketError.Condition.conflict);
- manager.receivePacket(errorResponse);
-
- // Confirm that two outgoing packets are sent.
- assertEquals(3, transport.packetsSent);
-
- // Expect a wave request
- Packet wavePacket = transport.packets.poll();
- assertEquals(EXPECTED_DISCO_INFO_GET, wavePacket.toString());
-
- // Expect packet targeted at TLD
- Packet pubsubPacket = transport.packets.poll();
- assertEquals(REMOTE_DOMAIN, pubsubPacket.getTo().toBareJID());
- checkAndResetStats(0, 0, 0); // not finished yet
- }
-
- /**
- * Tests sending multiple disco requests result in multiple callbacks.
- */
- public void testMultipleDiscoRequestsToSameDomain() {
- final int CALL_COUNT = 10;
- XmppUtil.fakeUniqueId = DISCO_ITEMS_ID;
- List<SuccessFailCallback<String, String>> callbacks = Lists.newLinkedList();
- for (int i = 0; i < CALL_COUNT; i++) {
- SuccessFailCallback<String, String> cb = createMockCallback();
- assertTrue(callbacks.add(cb));
- disco.discoverRemoteJid(REMOTE_DOMAIN, cb);
- }
- // Expect only one disco request to be sent.
- assertEquals(1, transport.packetsSent);
- Packet packet = transport.lastPacketSent;
- assertEquals(REMOTE_DOMAIN, packet.getTo().toString());
- assertEquals(LOCAL_JID, packet.getFrom().toString());
- assertEquals(EXPECTED_DISCO_ITEMS_GET, packet.toString());
-
- XmppUtil.fakeUniqueId = DISCO_INFO_ID;
- manager.receivePacket(createDiscoItems(true /* wave */, true /* pubsub */));
- manager.receivePacket(createDiscoInfo(true /* wave */));
-
- for(SuccessFailCallback<String, String> cb : callbacks) {
- verify(cb).onSuccess(eq(REMOTE_JID));
- verify(cb, never()).onFailure(anyString());
- }
- }
-
- /**
- * Create a disco#info result from the remote server.
- *
- * @param forWaveJID if true, it's for the remote Wave JID, else it's the
- * remote pubsub JID.
- * @return the new IQ packet.
- */
- private IQ createDiscoInfo(boolean forWaveJID) {
- IQ response = new IQ(IQ.Type.result);
- response.setTo(LOCAL_JID);
- response.setID(DISCO_INFO_ID);
- Element query = response.setChildElement("query", XmppNamespace.NAMESPACE_DISCO_INFO);
-
- if (forWaveJID) {
- response.setFrom(REMOTE_JID);
- query.addElement("identity")
- .addAttribute("category", MockDisco.config.getString("federation.disco_info_category"))
- .addAttribute("type", MockDisco.config.getString("federation.disco_info_type"))
- .addAttribute("name", SERVER_DESCRIPTION);
- query.addElement("feature")
- .addAttribute("var", XmppNamespace.NAMESPACE_WAVE_SERVER);
- } else {
- response.setFrom(REMOTE_PUBSUB_JID);
- query.addElement("identity")
- .addAttribute("category", "pubsub")
- .addAttribute("type", "whatever")
- .addAttribute("name", "not a wave server");
- query.addElement("feature")
- .addAttribute("var", XmppNamespace.NAMESPACE_PUBSUB);
- }
- return response;
- }
-
- /**
- * Create a wave.other.com info result that identifies it as non-wave. needed to force
- * failure in the case of the wave.foo fallback.
- * @return the new IQ result packet
- */
- private IQ createBrokenDiscoInfoForWaveJid() {
- IQ response = new IQ(IQ.Type.result);
- response.setTo(LOCAL_JID);
- response.setID(DISCO_INFO_ID);
- Element query = response.setChildElement("query", XmppNamespace.NAMESPACE_DISCO_INFO);
- response.setFrom(REMOTE_JID);
- query.addElement("identity")
- .addAttribute("category", "pubsub")
- .addAttribute("type", "whatever")
- .addAttribute("name", "not a wave server");
- query.addElement("feature")
- .addAttribute("var", XmppNamespace.NAMESPACE_PUBSUB);
- return response;
- }
-
- /**
- * Create a disco#items result, with either or both of a pubsub and a wave
- * JID.
- *
- * @param wave if true, create a wave JID item.
- * @param pubsub if true, create a pubsub JID item.
- * @return the new IQ packet.
- */
- private IQ createDiscoItems(boolean wave, boolean pubsub) {
- IQ discoItemsResult = new IQ(IQ.Type.result);
- discoItemsResult.setFrom(REMOTE_DOMAIN);
- discoItemsResult.setTo(LOCAL_JID);
- discoItemsResult.setID(DISCO_ITEMS_ID);
- Element discoBody =
- discoItemsResult.setChildElement("query", XmppNamespace.NAMESPACE_DISCO_ITEMS);
- if (wave) {
- discoBody.addElement("item").addAttribute("jid", REMOTE_JID);
- }
- if (pubsub) {
- discoBody.addElement("item").addAttribute("jid", REMOTE_PUBSUB_JID);
- }
- return discoItemsResult;
- }
-
- /**
- * Create a disco#info or disco#items query.
- *
- * @param namespace the namespace of the query - disco#info or disco#items
- * @return the new IQ packet
- */
- private IQ createDiscoRequest(String namespace) {
- IQ request = new IQ(IQ.Type.get);
- switch (namespace) {
- case XmppNamespace.NAMESPACE_DISCO_ITEMS:
- request.setID(DISCO_ITEMS_ID);
- break;
- case XmppNamespace.NAMESPACE_DISCO_INFO:
- request.setID(DISCO_INFO_ID);
- break;
- default:
- throw new IllegalArgumentException();
- }
- request.setTo(LOCAL_JID);
- request.setFrom(REMOTE_JID);
- request.setChildElement("query", namespace);
- return request;
- }
-
- @SuppressWarnings("unchecked")
- private SuccessFailCallback<String, String> createMockCallback() {
- return mock(SuccessFailCallback.class);
- }
-
- private void checkAndResetStats(int started, int success, int failed) {
- assertEquals("start counter", started, counterStarted.getAndSet(0));
- assertEquals("success counter", success, counterSuccess.getAndSet(0));
- assertEquals("failed counter", failed, counterFailed.getAndSet(0));
- }
-
- private void resetVarz() {
- counterStarted.getAndSet(0);
- counterSuccess.getAndSet(0);
- counterFailed.getAndSet(0);
- }
-
- /**
- * Advance the clock.
- *
- * @param millis milliseconds to advance clock
- */
- private void tick(int millis) {
- DateTimeUtils.setCurrentMillisFixed(DateTimeUtils.currentTimeMillis() + millis);
- }
-
- /**
- * Initiate a simple disco request to REMOTE_DOMAIN.
- */
- private void initiateDiscoRequest() {
- XmppUtil.fakeUniqueId = DISCO_ITEMS_ID;
- disco.discoverRemoteJid(REMOTE_DOMAIN, discoCallback);
- assertEquals("Disco packet should have been sent", 1, transport.packetsSent);
- Packet packet = transport.lastPacketSent;
- assertEquals(EXPECTED_DISCO_ITEMS_GET, packet.toString());
- }
-}
diff --git a/wave/src/test/java/org/waveprotocol/wave/federation/xmpp/XmppFederationHostForDomainTest.java b/wave/src/test/java/org/waveprotocol/wave/federation/xmpp/XmppFederationHostForDomainTest.java
deleted file mode 100644
index 499b692..0000000
--- a/wave/src/test/java/org/waveprotocol/wave/federation/xmpp/XmppFederationHostForDomainTest.java
+++ /dev/null
@@ -1,329 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.waveprotocol.wave.federation.xmpp;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-
-import com.google.protobuf.ByteString;
-
-import com.typesafe.config.ConfigFactory;
-import junit.framework.TestCase;
-
-import org.waveprotocol.wave.federation.ProtocolHashedVersionFactory;
-import org.waveprotocol.wave.federation.WaveletFederationListener;
-import org.waveprotocol.wave.federation.FederationErrorProto.FederationError;
-import org.waveprotocol.wave.federation.Proto.ProtocolHashedVersion;
-import org.waveprotocol.wave.federation.xmpp.MockDisco.PendingMockDisco;
-import org.waveprotocol.wave.model.id.WaveId;
-import org.waveprotocol.wave.model.id.WaveletId;
-import org.waveprotocol.wave.model.id.WaveletName;
-import org.waveprotocol.wave.model.id.URIEncoderDecoder.EncodingException;
-import org.xmpp.packet.Packet;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-
-/**
- * Tests for {@link XmppFederationHostForDomain}.
- *
- * @author arb@google.com (Anthony Baxter)
- * @author thorogood@google.com (Sam Thorogood)
- */
-
-public class XmppFederationHostForDomainTest extends TestCase {
-
- private final static String LOCAL_DOMAIN = "acmewave.com";
- private final static String LOCAL_JID = "wave." + LOCAL_DOMAIN;
- private final static String REMOTE_DOMAIN = "initech-corp.com";
- private final static String REMOTE_JID = "wave." + REMOTE_DOMAIN;
-
- private final static WaveletName WAVELET_NAME =
- WaveletName.of(WaveId.of(REMOTE_DOMAIN, "wave"), WaveletId.of(REMOTE_DOMAIN, "wavelet"));
- private final static ProtocolHashedVersion WAVELET_VERSION =
- ProtocolHashedVersionFactory.createVersionZero(WAVELET_NAME);
- private final static ByteString DELTA_BYTESTRING =
- ByteString.copyFromUtf8("Irrelevant delta bytes");
-
- private final static String TEST_ID_SUFFIX = "-1-sometestID";
-
-
- private MockDisco disco;
- private XmppFederationHostForDomain fedHost;
- private MockOutgoingPacketTransport transport;
-
- private static final String EXPECTED_UPDATE_MESSAGE;
-
- static {
- try {
- EXPECTED_UPDATE_MESSAGE =
- "\n<message type=\"normal\" from=\"" + LOCAL_JID + "\""
- + " to=\"" + REMOTE_JID + "\" id=\"" + "1" + TEST_ID_SUFFIX + "\">\n"
- + " <request xmlns=\"urn:xmpp:receipts\"/>\n"
- + " <event xmlns=\"http://jabber.org/protocol/pubsub#event\">\n"
- + " <items>\n"
- + " <item>\n"
- + " <wavelet-update"
- + " xmlns=\"http://waveprotocol.org/protocol/0.2/waveserver\""
- + " wavelet-name=\"" + XmppUtil.waveletNameCodec.waveletNameToURI(WAVELET_NAME) + "\">\n"
- + " <applied-delta>"
- + "<![CDATA[" + Base64Util.encode(DELTA_BYTESTRING) + "]]></applied-delta>\n"
- + " </wavelet-update>\n"
- + " </item>\n"
- + " </items>\n"
- + " </event>\n"
- + "</message>";
- } catch (EncodingException e) {
- throw new RuntimeException(e);
- }
- }
-
- private static final List<ByteString> NO_DELTAS = Collections.emptyList();
-
- @Override
- public void setUp() {
- XmppUtil.fakeIdGenerator = new Callable<String>() {
- private int idCounter = 0;
-
- public String call() throws Exception {
- idCounter++;
- return idCounter + TEST_ID_SUFFIX;
- }
- };
-
- disco = new MockDisco();
- transport = new MockOutgoingPacketTransport();
- XmppManager manager =
- new XmppManager(mock(XmppFederationHost.class), mock(XmppFederationRemote.class),
- disco, transport, ConfigFactory.parseString("federation.xmpp_jid : "
- + LOCAL_JID));
- fedHost = new XmppFederationHostForDomain(
- REMOTE_DOMAIN, manager, disco, ConfigFactory.parseString("federation.xmpp_jid : "
- + LOCAL_JID));
- }
-
- @Override
- protected void tearDown() throws Exception {
- super.tearDown();
- XmppUtil.fakeIdGenerator = null; // reset so as to not leave the class in a bad state.
- }
-
- /**
- * Tests that commit sends a correctly formatted XMPP packet.
- * @throws Exception should not be thrown
- */
- public void testCommit() throws Exception {
- commit(null);
- assertEquals(0, transport.packetsSent);
-
- successDiscoRequest();
- checkCommitMessage();
- }
-
- /**
- * Test we don't fall in a heap if disco fails.
- * @throws Exception should not be thrown
- */
- public void testCommitWithFailedDisco() throws Exception {
- WaveletFederationListener.WaveletUpdateCallback callback =
- mock(WaveletFederationListener.WaveletUpdateCallback.class);
- commit(callback);
- failDiscoRequest();
-
- // No packets should be sent.
- verify(callback).onFailure((FederationError) any());
- assertEquals(0, transport.packetsSent);
- }
-
- /**
- * Tests that update sends a correctly formatted XMPP packet.
- * @throws Exception should not be thrown
- */
- public void testUpdate() throws Exception {
- update(null);
- assertEquals(0, transport.packetsSent);
-
- successDiscoRequest();
- checkUpdateMessage();
- }
-
- /**
- * Tests that update sends a correctly formatted XMPP packet.
- * @throws Exception should not be thrown
- */
- public void testUpdateAndCommit() throws Exception {
-
- update(new WaveletFederationListener.WaveletUpdateCallback() {
-
- public void onSuccess() {
- // expected
- }
-
- public void onFailure(FederationError error) {
- fail("update failed: " + error);
- }
- });
- commit(new WaveletFederationListener.WaveletUpdateCallback() {
-
- public void onSuccess() {
- // expected
- }
-
- public void onFailure(FederationError error) {
- fail("commit failed: " + error);
- }
- });
- assertEquals(0, transport.packetsSent);
-
- successDiscoRequest(); // 2 packets outstanding - the commit and the update
- checkUpdateAndCommit();
- }
-
-
- /**
- * Test we don't fall in a heap if disco fails.
- * @throws Exception should not be thrown
- */
- public void testUpdateWithFailedDisco() throws Exception {
- WaveletFederationListener.WaveletUpdateCallback callback =
- mock(WaveletFederationListener.WaveletUpdateCallback.class);
- WaveletFederationListener.WaveletUpdateCallback callback2 =
- mock(WaveletFederationListener.WaveletUpdateCallback.class);
- update(callback);
- commit(callback2);
- failDiscoRequest();
-
- // No packets should be sent.
- verify(callback).onFailure((FederationError) any());
- verify(callback2).onFailure((FederationError) any());
- assertEquals(0, transport.packetsSent);
- }
-
- /**
- * Send a single commit notice containing a dummy version via {@link #fedHost}.
- *
- * @param updateCallback result callback
- */
- private void commit(WaveletFederationListener.WaveletUpdateCallback updateCallback) {
- fedHost.waveletUpdate(WAVELET_NAME, NO_DELTAS, WAVELET_VERSION, updateCallback);
- }
-
- /**
- * Send a single update message containing a dummy delta via {@link #fedHost}.
- *
- * @param updateCallback result callback
- */
- private void update(WaveletFederationListener.WaveletUpdateCallback updateCallback) {
- fedHost.waveletUpdate(WAVELET_NAME, Collections.singletonList(DELTA_BYTESTRING),
- null, updateCallback);
- }
-
- /**
- * Confirm that there is one outstanding disco request to REMOTE_DOMAIN, and
- * force its success.
- */
- private void successDiscoRequest() throws ExecutionException {
- assertEquals(1, disco.pending.size());
- PendingMockDisco v = disco.pending.get(REMOTE_DOMAIN);
- assertEquals(REMOTE_DOMAIN, v.remoteDomain);
- while (!v.callbacks.isEmpty()) {
- v.callbacks.poll().onSuccess(REMOTE_JID);
- }
- }
-
- /**
- * Confirm that there is one outstanding disco request to REMOTE_DOMAIN, and
- * force its failure.
- */
- private void failDiscoRequest() throws ExecutionException {
- assertEquals(1, disco.pending.size());
- PendingMockDisco v = disco.pending.get(REMOTE_DOMAIN);
- assertEquals(REMOTE_DOMAIN, v.remoteDomain);
- while (!v.callbacks.isEmpty()) {
- v.callbacks.poll().onFailure("Forced failure");
- }
- }
-
- /**
- * Check the commit message is as expected.
- */
- private void checkCommitMessage() {
- assertEquals(1, transport.packetsSent);
- Packet packet = transport.lastPacketSent;
- assertEquals(REMOTE_JID, packet.getTo().toString());
- assertEquals(LOCAL_JID, packet.getFrom().toString());
- assertEquals(generateExpectedCommitMessage("1" + TEST_ID_SUFFIX), packet.toString());
- }
-
- /**
- * Checks the update message is as expected.
- */
- private void checkUpdateMessage() {
- assertEquals(1, transport.packetsSent);
- Packet packet = transport.lastPacketSent;
- assertEquals(REMOTE_JID, packet.getTo().toString());
- assertEquals(LOCAL_JID, packet.getFrom().toString());
- assertEquals(EXPECTED_UPDATE_MESSAGE, packet.toString());
- }
-
- /**
- * Checks an update and then a commit message were sent.
- */
- private void checkUpdateAndCommit() {
- assertEquals(2, transport.packetsSent);
- Packet packet = transport.packets.poll();
- assertEquals(REMOTE_JID, packet.getTo().toString());
- assertEquals(LOCAL_JID, packet.getFrom().toString());
- assertEquals(EXPECTED_UPDATE_MESSAGE, packet.toString());
-
- packet = transport.packets.poll();
- assertEquals(REMOTE_JID, packet.getTo().toString());
- assertEquals(LOCAL_JID, packet.getFrom().toString());
- assertEquals(generateExpectedCommitMessage("2" + TEST_ID_SUFFIX), packet.toString());
- }
-
- private static String generateExpectedCommitMessage(String testId) {
- try {
- return
- "\n<message type=\"normal\" from=\"" + LOCAL_JID + "\""
- + " to=\"" + REMOTE_JID + "\" id=\"" + testId + "\">\n"
- + " <request xmlns=\"urn:xmpp:receipts\"/>\n"
- + " <event xmlns=\"http://jabber.org/protocol/pubsub#event\">\n"
- + " <items>\n"
- + " <item>\n"
- + " <wavelet-update"
- + " xmlns=\"http://waveprotocol.org/protocol/0.2/waveserver\""
- + " wavelet-name=\"" + XmppUtil.waveletNameCodec.waveletNameToURI(WAVELET_NAME) + "\">\n"
- + " <commit-notice version=\"" + WAVELET_VERSION.getVersion() + "\" history-hash=\""
- + Base64Util.encode(WAVELET_VERSION.getHistoryHash())
- + "\"/>\n"
- + " </wavelet-update>\n"
- + " </item>\n"
- + " </items>\n"
- + " </event>\n"
- + "</message>";
- } catch (EncodingException e) {
- throw new RuntimeException(e);
- }
- }
-}
diff --git a/wave/src/test/java/org/waveprotocol/wave/federation/xmpp/XmppFederationRemoteTest.java b/wave/src/test/java/org/waveprotocol/wave/federation/xmpp/XmppFederationRemoteTest.java
deleted file mode 100644
index 63eab47..0000000
--- a/wave/src/test/java/org/waveprotocol/wave/federation/xmpp/XmppFederationRemoteTest.java
+++ /dev/null
@@ -1,497 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.waveprotocol.wave.federation.xmpp;
-
-import com.google.common.collect.ImmutableList;
-import com.google.protobuf.ByteString;
-import com.typesafe.config.ConfigFactory;
-import junit.framework.TestCase;
-import org.dom4j.Element;
-import org.mockito.ArgumentCaptor;
-import org.waveprotocol.wave.federation.FederationErrorProto.FederationError;
-import org.waveprotocol.wave.federation.Proto.ProtocolHashedVersion;
-import org.waveprotocol.wave.federation.Proto.ProtocolSignedDelta;
-import org.waveprotocol.wave.federation.Proto.ProtocolSignerInfo;
-import org.waveprotocol.wave.federation.ProtocolHashedVersionFactory;
-import org.waveprotocol.wave.federation.WaveletFederationListener;
-import org.waveprotocol.wave.federation.WaveletFederationListener.WaveletUpdateCallback;
-import org.waveprotocol.wave.federation.WaveletFederationProvider;
-import org.waveprotocol.wave.federation.WaveletFederationProvider.DeltaSignerInfoResponseListener;
-import org.waveprotocol.wave.federation.WaveletFederationProvider.HistoryResponseListener;
-import org.waveprotocol.wave.federation.WaveletFederationProvider.PostSignerInfoResponseListener;
-import org.waveprotocol.wave.federation.WaveletFederationProvider.SubmitResultListener;
-import org.waveprotocol.wave.model.id.URIEncoderDecoder.EncodingException;
-import org.waveprotocol.wave.model.id.WaveId;
-import org.waveprotocol.wave.model.id.WaveletId;
-import org.waveprotocol.wave.model.id.WaveletName;
-import org.xmpp.packet.IQ;
-import org.xmpp.packet.Message;
-import org.xmpp.packet.PacketError;
-
-import java.util.List;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.*;
-
-/**
- * Tests for {@link XmppFederationRemote}.
- *
- * TODO(thorogood,arb): This class actually test round-trips sent from an
- * XmppFederationRemote to a XmppFederationHost.
- *
- * @author arb@google.com (Anthony Baxter)
- * @author thorogood@google.com (Sam Thorogood)
- */
-
-public class XmppFederationRemoteTest extends TestCase {
-
- private final static String LOCAL_DOMAIN = "acmewave.com";
- private final static String LOCAL_JID = "wave." + LOCAL_DOMAIN;
- private final static String REMOTE_DOMAIN = "initech-corp.com";
- private final static String REMOTE_JID = "wave." + REMOTE_DOMAIN;
-
- private final static WaveletName REMOTE_WAVELET =
- WaveletName.of(WaveId.of(REMOTE_DOMAIN, "wave"), WaveletId.of(REMOTE_DOMAIN, "wavelet"));
- private final static ProtocolHashedVersion START_VERSION =
- ProtocolHashedVersionFactory.createVersionZero(REMOTE_WAVELET);
- private final static ByteString DELTA_BYTESTRING =
- ByteString.copyFromUtf8("Irrelevant delta bytes");
- private final static ProtocolHashedVersion VERSION_ONE =
- ProtocolHashedVersionFactory.create(DELTA_BYTESTRING, START_VERSION, 1);
-
- private final static ProtocolSignedDelta DUMMY_SIGNED_DELTA =
- ProtocolSignedDelta.newBuilder().setDelta(ByteString.copyFromUtf8("fake blahblah")).build();
-
- private final static String TEST_ID = "1-1-sometestID";
-
- private final static ByteString FAKE_SIGNER_ID = ByteString.copyFromUtf8("Hello Signer!");
- private final static ProtocolSignerInfo FAKE_SIGNER_INFO = ProtocolSignerInfo.newBuilder()
- .setHashAlgorithm(ProtocolSignerInfo.HashAlgorithm.SHA256)
- .setDomain(REMOTE_DOMAIN)
- .addCertificate(ByteString.copyFromUtf8("Test certificate")).build();
-
- private MockOutgoingPacketTransport transport;
- private WaveletFederationListener.Factory mockUpdateListenerFactory;
- private MockDisco disco;
- private XmppManager manager;
-
- private WaveletFederationProvider mockProvider;
- private WaveletFederationListener mockUpdateListener;
-
- // The remote represents the 'caller' for all unit tests in this class.
- private XmppFederationRemote remote;
-
- // The host represents the 'callee' for all unit tests in this class.
- private XmppFederationHost host;
-
- private static final String EXPECTED_RECEIPT_MESSAGE =
- "\n<message id=\"" + TEST_ID + "\" to=\"" + REMOTE_JID + "\""
- + " from=\"" + LOCAL_JID + "\">\n"
- + " <received xmlns=\"urn:xmpp:receipts\"/>\n"
- + "</message>";
-
- private static final String EXPECTED_SUBMIT_REQUEST;
- private static final String EXPECTED_HISTORY_REQUEST;
-
- static {
- try {
- String uri = XmppUtil.waveletNameCodec.waveletNameToURI(REMOTE_WAVELET);
- EXPECTED_SUBMIT_REQUEST =
- "\n<iq type=\"set\" id=\"" + TEST_ID + "\" from=\"" + LOCAL_JID + "\"" +
- " to=\"" + REMOTE_JID + "\">\n"
- + " <pubsub xmlns=\"http://jabber.org/protocol/pubsub\">\n"
- + " <publish node=\"wavelet\">\n"
- + " <item>\n"
- + " <submit-request xmlns=\"http://waveprotocol.org/protocol/0.2/waveserver\">\n"
- + " <delta wavelet-name=\"" + uri + "\">" +
- "<![CDATA[" + Base64Util.encode(DUMMY_SIGNED_DELTA) + "]]></delta>\n"
- + " </submit-request>\n"
- + " </item>\n"
- + " </publish>\n"
- + " </pubsub>\n"
- + "</iq>";
-
- EXPECTED_HISTORY_REQUEST =
- "\n<iq type=\"get\" id=\"" + TEST_ID + "\" from=\"" + LOCAL_JID + "\"" +
- " to=\"" + REMOTE_JID + "\">\n"
- + " <pubsub xmlns=\"http://jabber.org/protocol/pubsub\">\n"
- + " <items node=\"wavelet\">\n"
- + " <delta-history xmlns=\"http://waveprotocol.org/protocol/0.2/waveserver\""
- + " start-version=\"" + START_VERSION.getVersion() + "\""
- + " start-version-hash=\"" + Base64Util.encode(START_VERSION.getHistoryHash()) + "\""
- + " end-version=\"" + VERSION_ONE.getVersion() + "\""
- + " end-version-hash=\"" + Base64Util.encode(VERSION_ONE.getHistoryHash()) + "\""
- + " wavelet-name=\"" + uri + "\"/>\n"
- + " </items>\n"
- + " </pubsub>\n"
- + "</iq>";
- } catch (EncodingException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void setUp() {
- XmppUtil.fakeUniqueId = TEST_ID;
-
- mockProvider = mock(WaveletFederationProvider.class);
- mockUpdateListener = mock(WaveletFederationListener.class);
- mockUpdateListenerFactory = mock(WaveletFederationListener.Factory.class);
-
- when(mockUpdateListenerFactory.listenerForDomain(eq(REMOTE_DOMAIN)))
- .thenReturn(mockUpdateListener);
-
- // Create mockDisco. It wants an XmppManager, but we don't need to set it here.
- disco = new MockDisco();
-
- transport = new MockOutgoingPacketTransport();
- remote = new XmppFederationRemote(mockUpdateListenerFactory, disco,
- ConfigFactory.parseString("federation.xmpp_jid : " + LOCAL_JID));
- host = new XmppFederationHost(mockProvider, disco,
- ConfigFactory.parseString("federation.xmpp_jid : " + REMOTE_JID));
- manager = new XmppManager(host, remote, disco, transport,
- ConfigFactory.parseString("federation.xmpp_jid : " + LOCAL_JID));
-
- remote.setManager(manager);
- }
-
- /**
- * Tests that the constructor behaves as expected.
- */
- public void testConstructor() {
- assertEquals(0, transport.packetsSent);
- }
-
- /**
- * Tests that a submit request from a local wave server is sent out to the
- * foreign federation host, and that the response from it is passed back to
- * the wave server.
- */
- public void testSubmitRequest() {
- int OPS_APPLIED = 1;
- long TIMESTAMP_APPLIED = 123;
- ProtocolHashedVersion APPLIED_AT = ProtocolHashedVersion.newBuilder()
- .setVersion(VERSION_ONE.getVersion() + OPS_APPLIED)
- .setHistoryHash(ByteString.copyFromUtf8("blah")).build();
-
- disco.testInjectInDomainToJidMap(REMOTE_DOMAIN, REMOTE_JID);
-
- SubmitResultListener listener = mock(SubmitResultListener.class);
- remote.submitRequest(REMOTE_WAVELET, DUMMY_SIGNED_DELTA, listener);
- verifyZeroInteractions(listener);
- assertEquals(1, transport.packetsSent);
-
- // Validate the outgoing request.
- IQ outgoingRequest = (IQ) transport.packets.poll();
- assertEquals(EXPECTED_SUBMIT_REQUEST, outgoingRequest.toString());
-
- // Send the outgoing request back to the manager, so it hooks up to the
- // Federation Host.
- manager.receivePacket(outgoingRequest);
-
- // Provide the remote's host with a dummy answer to verified input.
- ArgumentCaptor<SubmitResultListener> remoteListener =
- ArgumentCaptor.forClass(SubmitResultListener.class);
- verify(mockProvider)
- .submitRequest(eq(REMOTE_WAVELET), eq(DUMMY_SIGNED_DELTA), remoteListener.capture());
- remoteListener.getValue().onSuccess(OPS_APPLIED, APPLIED_AT, TIMESTAMP_APPLIED);
-
- // Confirm that the packet has been sent back out over the transport.
- assertEquals(2, transport.packetsSent);
- IQ historyResponse = (IQ) transport.packets.poll();
- manager.receivePacket(historyResponse);
-
- // Confirm that the success is finally delivered to the listener.
- verify(listener, never()).onFailure(any(FederationError.class));
- verify(listener)
- .onSuccess(eq(OPS_APPLIED), any(ProtocolHashedVersion.class), eq(TIMESTAMP_APPLIED));
- }
-
- /**
- * Tests that that a submit request sent out can properly process a resulting
- * error.
- */
- public void testSubmitRequestError() {
- disco.testInjectInDomainToJidMap(REMOTE_DOMAIN, REMOTE_JID);
-
- SubmitResultListener listener = mock(SubmitResultListener.class);
- remote.submitRequest(REMOTE_WAVELET, DUMMY_SIGNED_DELTA, listener);
-
- verifyZeroInteractions(listener);
- assertEquals(1, transport.packetsSent);
-
- // Validate the outgoing request.
- IQ outgoingRequest = (IQ) transport.packets.poll();
- assertEquals(EXPECTED_SUBMIT_REQUEST, outgoingRequest.toString());
-
- // Return a confusing error response (<registration-required>).
- IQ errorResponse = IQ.createResultIQ(outgoingRequest);
- errorResponse.setError(PacketError.Condition.registration_required);
- manager.receivePacket(errorResponse);
-
- // Confirm error is passed through to the callback.
- ArgumentCaptor<FederationError> error = ArgumentCaptor.forClass(FederationError.class);
- verify(listener).onFailure(error.capture());
- verify(listener, never())
- .onSuccess(anyInt(), any(ProtocolHashedVersion.class), anyLong());
- assertEquals(FederationError.Code.UNDEFINED_CONDITION, error.getValue().getErrorCode());
- }
-
- /**
- * Tests that a submit request doesn't fall over if disco fails, but instead
- * passes an error back to the wave server.
- */
- public void testSubmitRequestDiscoFailed() {
- disco.testInjectInDomainToJidMap(REMOTE_DOMAIN, null);
-
- SubmitResultListener listener = mock(SubmitResultListener.class);
- ProtocolSignedDelta signedDelta =
- ProtocolSignedDelta.newBuilder().setDelta(ByteString.copyFromUtf8("fake")).build();
- remote.submitRequest(REMOTE_WAVELET, signedDelta, listener);
- verify(listener).onFailure(any(FederationError.class));
- verify(listener, never())
- .onSuccess(anyInt(), any(ProtocolHashedVersion.class), anyLong());
- }
-
- /**
- * Tests that a history request from a local wave server is sent out to the
- * foreign federation host, and that the response from it is passed back to
- * the wave server.
- */
- public void testHistoryRequest() {
- disco.testInjectInDomainToJidMap(REMOTE_DOMAIN, REMOTE_JID);
-
- // Send the outgoing request. Assert that a packet is sent and that no
- // callbacks have been invoked.
- HistoryResponseListener listener = mock(HistoryResponseListener.class);
- remote.requestHistory(REMOTE_WAVELET, REMOTE_DOMAIN, START_VERSION, VERSION_ONE, -1, listener);
- verifyZeroInteractions(listener);
- assertEquals(1, transport.packetsSent);
-
- // Validate the outgoing request.
- IQ outgoingRequest = (IQ) transport.packets.poll();
- assertEquals(EXPECTED_HISTORY_REQUEST, outgoingRequest.toString());
-
- // Send the outgoing request back to the manager, so it hooks up to the
- // Federation Host.
- manager.receivePacket(outgoingRequest);
-
- ArgumentCaptor<HistoryResponseListener> remoteListener =
- ArgumentCaptor.forClass(HistoryResponseListener.class);
- // TODO(thorogood): Note that the caller's JID is not the domain we expect
- // here - it is not actually the domain of the requester!
- verify(mockProvider).requestHistory(eq(REMOTE_WAVELET), eq(LOCAL_JID), eq(START_VERSION),
- eq(VERSION_ONE), anyInt(), remoteListener.capture());
- remoteListener.getValue().onSuccess(ImmutableList.of(DELTA_BYTESTRING), VERSION_ONE, 0);
-
- // Confirm that the packet has been sent back out over the transport.
- assertEquals(2, transport.packetsSent);
- IQ historyResponse = (IQ) transport.packets.poll();
- manager.receivePacket(historyResponse);
-
- // Confirm that the success is finally delivered to the listener.
- ArgumentCaptor<ProtocolHashedVersion> commitVersion =
- ArgumentCaptor.forClass(ProtocolHashedVersion.class);
- verify(listener, never()).onFailure(any(FederationError.class));
- verify(listener).onSuccess(eq(ImmutableList.of(DELTA_BYTESTRING)),
- commitVersion.capture(), anyInt());
-
- // Confirm that the returned commit notice matches the expected value.
- // TODO(thorogood): We don't transfer the history hash over the wire.
- assertEquals(VERSION_ONE.getVersion(), commitVersion.getValue().getVersion());
- assertEquals(ByteString.EMPTY, commitVersion.getValue().getHistoryHash());
- }
-
- /**
- * Helper method wrapping an unchecked mock conversion.
- */
- @SuppressWarnings("unchecked")
- private static List<ByteString> anyListByteString() {
- return any(List.class);
- }
-
- /**
- * Tests that a submit request doesn't fall over if disco fails, but instead
- * passes an error back to the wave server.
- */
- public void testHistoryRequestDiscoFailed() {
- disco.testInjectInDomainToJidMap(REMOTE_DOMAIN, null);
-
- HistoryResponseListener listener = mock(HistoryResponseListener.class);
- ProtocolSignedDelta signedDelta =
- ProtocolSignedDelta.newBuilder().setDelta(ByteString.copyFromUtf8("fake")).build();
- remote.requestHistory(REMOTE_WAVELET, REMOTE_DOMAIN, START_VERSION, VERSION_ONE, -1, listener);
- verify(listener).onFailure(any(FederationError.class));
- verify(listener, never())
- .onSuccess(anyListByteString(), any(ProtocolHashedVersion.class), anyLong());
- }
-
- /**
- * Test a successful get signer.
- */
- public void testGetSigner() {
- disco.testInjectInDomainToJidMap(REMOTE_DOMAIN, REMOTE_JID);
-
- // Send the outgoing request. Assert that a packet is sent and that no
- // callbacks have been invoked.
- DeltaSignerInfoResponseListener listener = mock(DeltaSignerInfoResponseListener.class);
- remote.getDeltaSignerInfo(FAKE_SIGNER_ID, REMOTE_WAVELET, VERSION_ONE, listener);
- verifyZeroInteractions(listener);
- assertEquals(1, transport.packetsSent);
-
- // Validate the outgoing request.
- IQ outgoingRequest = (IQ) transport.packets.poll();
- //assertEquals(EXPECTED_HISTORY_REQUEST, outgoingRequest.toString());
-
- // Send the outgoing request back to the manager, so it hooks up to the
- // Federation Host.
- manager.receivePacket(outgoingRequest);
-
- ArgumentCaptor<DeltaSignerInfoResponseListener> remoteListener =
- ArgumentCaptor.forClass(DeltaSignerInfoResponseListener.class);
- verify(mockProvider).getDeltaSignerInfo(eq(FAKE_SIGNER_ID), eq(REMOTE_WAVELET), eq(VERSION_ONE),
- remoteListener.capture());
- remoteListener.getValue().onSuccess(FAKE_SIGNER_INFO);
-
- // Confirm that the packet has been sent back out over the transport.
- assertEquals(2, transport.packetsSent);
- IQ historyResponse = (IQ) transport.packets.poll();
- manager.receivePacket(historyResponse);
-
- // Confirm that the success is finally delivered to the listener.
- verify(listener, never()).onFailure(any(FederationError.class));
- verify(listener).onSuccess(eq(FAKE_SIGNER_INFO));
- }
-
- /**
- * Test a successful post signer.
- */
- public void testPostSigner() {
- disco.testInjectInDomainToJidMap(REMOTE_DOMAIN, REMOTE_JID);
-
- // Send the outgoing request. Assert that a packet is sent and that no
- // callbacks have been invoked.
- PostSignerInfoResponseListener listener = mock(PostSignerInfoResponseListener.class);
- remote.postSignerInfo(REMOTE_DOMAIN, FAKE_SIGNER_INFO, listener);
- verifyZeroInteractions(listener);
- assertEquals(1, transport.packetsSent);
-
- // Validate the outgoing request.
- IQ outgoingRequest = (IQ) transport.packets.poll();
- //assertEquals(EXPECTED_HISTORY_REQUEST, outgoingRequest.toString());
-
- // Send the outgoing request back to the manager, so it hooks up to the
- // Federation Host.
- manager.receivePacket(outgoingRequest);
-
- ArgumentCaptor<PostSignerInfoResponseListener> remoteListener =
- ArgumentCaptor.forClass(PostSignerInfoResponseListener.class);
- verify(mockProvider).postSignerInfo(eq(REMOTE_DOMAIN), eq(FAKE_SIGNER_INFO),
- remoteListener.capture());
- remoteListener.getValue().onSuccess();
-
- // Confirm that the packet has been sent back out over the transport.
- assertEquals(2, transport.packetsSent);
- IQ historyResponse = (IQ) transport.packets.poll();
- manager.receivePacket(historyResponse);
-
- // Confirm that the success is finally delivered to the listener.
- verify(listener, never()).onFailure(any(FederationError.class));
- verify(listener).onSuccess();
- }
-
- /**
- * Tests an update message containing both a delta and commit notice from a
- * foreign federation host is correctly decoded and passed to the Update
- * Listener Factory, and a response is sent as requested.
- */
- public void testUpdate() throws EncodingException {
- Message updateMessage = new Message();
- Element waveletUpdate = addWaveletUpdate(updateMessage, true); // request receipt
- waveletUpdate.addElement("applied-delta").addCDATA(Base64Util.encode(DELTA_BYTESTRING));
- waveletUpdate.addElement("commit-notice")
- .addAttribute("version", String.valueOf(VERSION_ONE.getVersion()))
- .addAttribute("history-hash", Base64Util.encode(VERSION_ONE.getHistoryHash()));
-
- manager.receivePacket(updateMessage);
-
- ArgumentCaptor<WaveletUpdateCallback> deltaCallback =
- ArgumentCaptor.forClass(WaveletUpdateCallback.class);
- List<ByteString> expected = ImmutableList.of(DELTA_BYTESTRING);
- verify(mockUpdateListener).waveletDeltaUpdate(eq(REMOTE_WAVELET), eq(expected),
- deltaCallback.capture());
-
- deltaCallback.getValue().onSuccess();
- assertEquals(0, transport.packetsSent); // Callback has only been invoked once.
-
- ArgumentCaptor<WaveletUpdateCallback> commitCallback =
- ArgumentCaptor.forClass(WaveletUpdateCallback.class);
- verify(mockUpdateListener).waveletCommitUpdate(eq(REMOTE_WAVELET), eq(VERSION_ONE),
- commitCallback.capture());
-
- commitCallback.getValue().onSuccess();
- assertEquals(1, transport.packetsSent); // Callback has been invoked twice, expect receipt.
- assertEquals(EXPECTED_RECEIPT_MESSAGE, transport.lastPacketSent.toString());
- }
-
- /**
- * Test that a single update message, where a receipt is not requested, is
- * correctly received and processed.
- */
- public void testUpdateNoReceipt() throws EncodingException {
- Message updateMessage = new Message();
- Element waveletUpdate = addWaveletUpdate(updateMessage, false);
- waveletUpdate.addElement("applied-delta").addCDATA(Base64Util.encode(DELTA_BYTESTRING));
-
- manager.receivePacket(updateMessage);
-
- ArgumentCaptor<WaveletUpdateCallback> deltaCallback =
- ArgumentCaptor.forClass(WaveletUpdateCallback.class);
- List<ByteString> expected = ImmutableList.of(DELTA_BYTESTRING);
- verify(mockUpdateListener).waveletDeltaUpdate(eq(REMOTE_WAVELET), eq(expected),
- deltaCallback.capture());
-
- deltaCallback.getValue().onSuccess();
- assertEquals(0, transport.packetsSent); // Do not expect a callback.
- }
-
- /**
- * Add a single wavelet-update message to the given Message. Should (probably)
- * not be called twice on the same Message.
- */
- private Element addWaveletUpdate(Message updateMessage, boolean requestReceipt)
- throws EncodingException {
- updateMessage.setFrom(REMOTE_JID);
- updateMessage.setTo(LOCAL_JID);
- updateMessage.setID(TEST_ID);
- if (requestReceipt) {
- updateMessage.addChildElement("request", XmppNamespace.NAMESPACE_XMPP_RECEIPTS);
- }
- Element event = updateMessage.addChildElement("event", XmppNamespace.NAMESPACE_PUBSUB_EVENT);
- Element waveletUpdate =
- event.addElement("items").addElement("item").addElement("wavelet-update");
- waveletUpdate.addAttribute("wavelet-name",
- XmppUtil.waveletNameCodec.waveletNameToURI(REMOTE_WAVELET));
- return waveletUpdate;
- }
-}
diff --git a/wave/src/test/java/org/waveprotocol/wave/model/supplement/SimpleWantedEvaluationSetTest.java b/wave/src/test/java/org/waveprotocol/wave/model/supplement/SimpleWantedEvaluationSetTest.java
index df80ef7..21c0d6e 100644
--- a/wave/src/test/java/org/waveprotocol/wave/model/supplement/SimpleWantedEvaluationSetTest.java
+++ b/wave/src/test/java/org/waveprotocol/wave/model/supplement/SimpleWantedEvaluationSetTest.java
@@ -45,7 +45,7 @@
EVALUATION_2 =
new SimpleWantedEvaluation(TEST_ID, ADDER, false, 0.2f, 1010, "test2", false, "");
EVALUATION_3 =
- new SimpleWantedEvaluation(TEST_ID, ADDER, true, 0.3f, 1010, "agent", true, "");
+ new SimpleWantedEvaluation(TEST_ID, ADDER, true, 0.3f, 1011, "agent", true, "");
}
public void testBasic() {