GOSSIP-81 Move Jackson and UDP to their own modules
Part of what makes this work is the test implementation of TransportManager.
This PR is pretty straightforward. A few gotchas though:
* A message signing test was moved into `JacksonTests` because that is
where the signing actually happens.
* A CRDT serializing test was moved there as well. It's the best place
for now.
* No UDP tests at all. I plan to fix that in a bit. Reasoning is that it is
difficult to test any TransportManager implementation without bring up
a full stack. I plan to address this in the future (GOSSIP-83).
* Simple round trip Jackson serialization tests.
diff --git a/gossip-base/pom.xml b/gossip-base/pom.xml
index 3529bd1..b9739f6 100644
--- a/gossip-base/pom.xml
+++ b/gossip-base/pom.xml
@@ -1,4 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
@@ -36,36 +53,6 @@
<artifactId>metrics-core</artifactId>
<version>${metrics.version}</version></dependency>
<dependency>
- <groupId>org.junit.jupiter</groupId>
- <artifactId>junit-jupiter-api</artifactId>
- <version>${junit.jupiter.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.junit.jupiter</groupId>
- <artifactId>junit-jupiter-engine</artifactId>
- <version>${junit.jupiter.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.junit.vintage</groupId>
- <artifactId>junit-vintage-engine</artifactId>
- <version>${junit.vintage.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.junit.platform</groupId>
- <artifactId>junit-platform-runner</artifactId>
- <version>${junit.platform.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>io.teknek</groupId>
- <artifactId>tunit</artifactId>
- <version>${tunit.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
diff --git a/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java b/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java
index e4a95d3..2ceb453 100644
--- a/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java
+++ b/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java
@@ -45,8 +45,8 @@
private String activeGossipClass = "org.apache.gossip.manager.SimpleActiveGossipper";
- private String transportManagerClass = "org.apache.gossip.transport.UdpTransportManager";
- private String protocolManagerClass = "org.apache.gossip.protocol.JacksonProtocolManager";
+ private String transportManagerClass = "org.apache.gossip.transport.udp.UdpTransportManager";
+ private String protocolManagerClass = "org.apache.gossip.protocol.json.JacksonProtocolManager";
private Map<String,String> activeGossipProperties = new HashMap<>();
@@ -230,7 +230,15 @@
return transportManagerClass;
}
+ public void setTransportManagerClass(String transportManagerClass) {
+ this.transportManagerClass = transportManagerClass;
+ }
+
public String getProtocolManagerClass() {
return protocolManagerClass;
}
+
+ public void setProtocolManagerClass(String protocolManagerClass) {
+ this.protocolManagerClass = protocolManagerClass;
+ }
}
diff --git a/gossip-base/src/main/java/org/apache/gossip/StartupSettings.java b/gossip-base/src/main/java/org/apache/gossip/StartupSettings.java
index 17eaaf2..dd30e88 100644
--- a/gossip-base/src/main/java/org/apache/gossip/StartupSettings.java
+++ b/gossip-base/src/main/java/org/apache/gossip/StartupSettings.java
@@ -185,10 +185,22 @@
if (cluster == null){
throw new IllegalArgumentException("cluster was null. It is required");
}
+ String transportClass = jsonObject.has("transport_manager_class") ?
+ jsonObject.get("transport_manager_class").textValue() :
+ null;
+ String protocolClass = jsonObject.has("protocol_manager_class") ?
+ jsonObject.get("protocol_manager_class").textValue() :
+ null;
URI uri2 = new URI(uri);
- StartupSettings settings = new StartupSettings(id, uri2,
- new GossipSettings(gossipInterval, cleanupInterval, windowSize,
- minSamples, convictThreshold, distribution), cluster);
+ GossipSettings gossipSettings = new GossipSettings(gossipInterval, cleanupInterval, windowSize, minSamples,
+ convictThreshold, distribution);
+ if (transportClass != null) {
+ gossipSettings.setTransportManagerClass(transportClass);
+ }
+ if (protocolClass != null) {
+ gossipSettings.setProtocolManagerClass(protocolClass);
+ }
+ StartupSettings settings = new StartupSettings(id, uri2, gossipSettings, cluster);
String configMembersDetails = "Config-members [";
JsonNode membersJSON = jsonObject.get("members");
Iterator<JsonNode> it = membersJSON.iterator();
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java b/gossip-base/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
index 30e39d5..03a874c 100644
--- a/gossip-base/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
@@ -61,7 +61,10 @@
LOGGER.error("Unable to process message", ex);
}
} catch (IOException e) {
- LOGGER.error(e);
+ // InterruptedException are completely normal here because of the blocking lifecycle.
+ if (!(e.getCause() instanceof InterruptedException)) {
+ LOGGER.error(e);
+ }
keepRunning.set(false);
}
}
diff --git a/gossip-base/src/main/java/org/apache/gossip/transport/AbstractTransportManager.java b/gossip-base/src/main/java/org/apache/gossip/transport/AbstractTransportManager.java
index 497e605..33db038 100644
--- a/gossip-base/src/main/java/org/apache/gossip/transport/AbstractTransportManager.java
+++ b/gossip-base/src/main/java/org/apache/gossip/transport/AbstractTransportManager.java
@@ -66,7 +66,8 @@
try {
boolean result = gossipThreadExecutor.awaitTermination(10, TimeUnit.MILLISECONDS);
if (!result) {
- LOGGER.error("executor shutdown timed out");
+ // common when blocking patterns are used to read data from a socket.
+ LOGGER.warn("executor shutdown timed out");
}
} catch (InterruptedException e) {
LOGGER.error(e);
diff --git a/gossip-base/src/test/java/org/apache/gossip/DataTest.java b/gossip-base/src/test/java/org/apache/gossip/DataTest.java
index f0c2186..bb33dc2 100644
--- a/gossip-base/src/test/java/org/apache/gossip/DataTest.java
+++ b/gossip-base/src/test/java/org/apache/gossip/DataTest.java
@@ -47,6 +47,8 @@
GossipSettings settings = new GossipSettings();
settings.setPersistRingState(false);
settings.setPersistDataState(false);
+ settings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager");
+ settings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager");
String cluster = UUID.randomUUID().toString();
int seedNodes = 1;
List<Member> startupMembers = new ArrayList<>();
diff --git a/gossip-base/src/test/java/org/apache/gossip/IdAndPropertyTest.java b/gossip-base/src/test/java/org/apache/gossip/IdAndPropertyTest.java
index 7f550de..1b6a32a 100644
--- a/gossip-base/src/test/java/org/apache/gossip/IdAndPropertyTest.java
+++ b/gossip-base/src/test/java/org/apache/gossip/IdAndPropertyTest.java
@@ -43,6 +43,8 @@
public void testDatacenterRackGossiper() throws URISyntaxException, UnknownHostException, InterruptedException {
GossipSettings settings = new GossipSettings();
settings.setActiveGossipClass(DatacenterRackAwareActiveGossiper.class.getName());
+ settings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager");
+ settings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager");
List<Member> startupMembers = new ArrayList<>();
Map<String, String> x = new HashMap<>();
x.put("a", "b");
diff --git a/gossip-base/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java b/gossip-base/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
index 54005c3..30c52bc 100644
--- a/gossip-base/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
+++ b/gossip-base/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
@@ -44,10 +44,14 @@
private static final Logger log = Logger.getLogger(ShutdownDeadtimeTest.class);
+ // Note: this test is floppy depending on the values in GossipSettings (smaller values seem to do harm), and the
+ // sleep that happens after startup.
@Test
public void DeadNodesDoNotComeAliveAgain()
throws InterruptedException, UnknownHostException, URISyntaxException {
GossipSettings settings = new GossipSettings(100, 10000, 1000, 1, 10.0, "normal");
+ settings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager");
+ settings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager");
settings.setPersistRingState(false);
settings.setPersistDataState(false);
String cluster = UUID.randomUUID().toString();
@@ -70,7 +74,7 @@
.build();
clients.add(gossipService);
gossipService.init();
-
+ Thread.sleep(1000);
}
TUnit.assertThat(new Callable<Integer>() {
public Integer call() throws Exception {
diff --git a/gossip-base/src/test/java/org/apache/gossip/SignedMessageTest.java b/gossip-base/src/test/java/org/apache/gossip/SignedMessageTest.java
index 5c3bb76..f669a23 100644
--- a/gossip-base/src/test/java/org/apache/gossip/SignedMessageTest.java
+++ b/gossip-base/src/test/java/org/apache/gossip/SignedMessageTest.java
@@ -21,7 +21,6 @@
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
-import java.net.UnknownHostException;
import java.security.NoSuchAlgorithmException;
import java.security.NoSuchProviderException;
import java.util.ArrayList;
@@ -41,28 +40,13 @@
public class SignedMessageTest extends AbstractIntegrationBase {
- @Test(expected = IllegalArgumentException.class)
- public void ifSignMustHaveKeys()
- throws URISyntaxException, UnknownHostException, InterruptedException {
- String cluster = UUID.randomUUID().toString();
- GossipSettings settings = gossiperThatSigns();
- List<Member> startupMembers = new ArrayList<>();
- URI uri = new URI("udp://" + "127.0.0.1" + ":" + (30000 + 1));
- GossipManager gossipService = GossipManagerBuilder.newBuilder()
- .cluster(cluster)
- .uri(uri)
- .id(1 + "")
- .gossipMembers(startupMembers)
- .gossipSettings(settings)
- .build();
- gossipService.init();
- }
-
private GossipSettings gossiperThatSigns(){
GossipSettings settings = new GossipSettings();
settings.setPersistRingState(false);
settings.setPersistDataState(false);
settings.setSignMessages(true);
+ settings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager");
+ settings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager");
return settings;
}
diff --git a/gossip-base/src/test/java/org/apache/gossip/StartupSettingsTest.java b/gossip-base/src/test/java/org/apache/gossip/StartupSettingsTest.java
index d6c4a1e..ea93a90 100644
--- a/gossip-base/src/test/java/org/apache/gossip/StartupSettingsTest.java
+++ b/gossip-base/src/test/java/org/apache/gossip/StartupSettingsTest.java
@@ -47,11 +47,14 @@
settingsFile.deleteOnExit();
writeSettingsFile(settingsFile);
URI uri = new URI("udp://" + "127.0.0.1" + ":" + 50000);
+ GossipSettings firstGossipSettings = new GossipSettings();
+ firstGossipSettings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager");
+ firstGossipSettings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager");
GossipManager firstService = GossipManagerBuilder.newBuilder()
.cluster(CLUSTER)
.uri(uri)
.id("1")
- .gossipSettings(new GossipSettings()).build();
+ .gossipSettings(firstGossipSettings).build();
firstService.init();
GossipManager manager = GossipManagerBuilder.newBuilder()
.startupSettings(StartupSettings.fromJSONFile(settingsFile)).build();
@@ -72,6 +75,8 @@
" \"cleanup_interval\":10000,\n" +
" \"convict_threshold\":2.6,\n" +
" \"distribution\":\"exponential\",\n" +
+ " \"transport_manager_class\":\"org.apache.gossip.transport.UnitTestTransportManager\",\n" +
+ " \"protocol_manager_class\":\"org.apache.gossip.protocol.UnitTestProtocolManager\",\n" +
" \"properties\":{},\n" +
" \"members\":[\n" +
" {\"cluster\": \"" + CLUSTER + "\",\"uri\":\"udp://127.0.0.1:5000\"}\n" +
diff --git a/gossip-base/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java b/gossip-base/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
index 8ae783e..c6d7d46 100644
--- a/gossip-base/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
+++ b/gossip-base/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
@@ -50,6 +50,8 @@
GossipSettings settings = new GossipSettings(1000, 10000, 1000, 1, 1.6, "exponential");
settings.setPersistRingState(false);
settings.setPersistDataState(false);
+ settings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager");
+ settings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager");
String cluster = UUID.randomUUID().toString();
int seedNodes = 3;
List<Member> startupMembers = new ArrayList<>();
diff --git a/gossip-base/src/test/java/org/apache/gossip/crdt/OrSetTest.java b/gossip-base/src/test/java/org/apache/gossip/crdt/OrSetTest.java
index 4c6014a..70c0d51 100644
--- a/gossip-base/src/test/java/org/apache/gossip/crdt/OrSetTest.java
+++ b/gossip-base/src/test/java/org/apache/gossip/crdt/OrSetTest.java
@@ -17,15 +17,10 @@
*/
package org.apache.gossip.crdt;
-import java.io.IOException;
-import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.SortedSet;
import java.util.TreeSet;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.gossip.GossipSettings;
-import org.apache.gossip.protocol.JacksonProtocolManager;
import org.junit.Assert;
import org.junit.Test;
@@ -86,16 +81,6 @@
}
@Test
- public void serialTest() throws InterruptedException, URISyntaxException, IOException {
- ObjectMapper objectMapper = JacksonProtocolManager.buildObjectMapper(new GossipSettings());
- OrSet<Integer> i = new OrSet<Integer>(new OrSet.Builder<Integer>().add(1).remove(1));
- String s = objectMapper.writeValueAsString(i);
- @SuppressWarnings("unchecked")
- OrSet<Integer> back = objectMapper.readValue(s, OrSet.class);
- Assert.assertEquals(back, i);
- }
-
- @Test
public void mergeTestSame() {
OrSet<Integer> i = new OrSet<>(19);
OrSet<Integer> j = new OrSet<>(19);
diff --git a/gossip-base/src/test/java/org/apache/gossip/manager/DataReaperTest.java b/gossip-base/src/test/java/org/apache/gossip/manager/DataReaperTest.java
index e328c24..1a9d43b 100644
--- a/gossip-base/src/test/java/org/apache/gossip/manager/DataReaperTest.java
+++ b/gossip-base/src/test/java/org/apache/gossip/manager/DataReaperTest.java
@@ -40,6 +40,8 @@
GossipSettings settings = new GossipSettings();
settings.setPersistRingState(false);
settings.setPersistDataState(false);
+ settings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager");
+ settings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager");
GossipManager gm = GossipManagerBuilder.newBuilder().cluster("abc").gossipSettings(settings)
.id(myId).uri(URI.create("udp://localhost:6000")).registry(registry).build();
gm.init();
@@ -88,6 +90,8 @@
String key = "key";
String value = "a";
GossipSettings settings = new GossipSettings();
+ settings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager");
+ settings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager");
GossipManager gm = GossipManagerBuilder.newBuilder().cluster("abc").gossipSettings(settings)
.id(myId).uri(URI.create("udp://localhost:7000")).registry(registry).build();
gm.init();
diff --git a/gossip-base/src/test/java/org/apache/gossip/manager/UserDataPersistenceTest.java b/gossip-base/src/test/java/org/apache/gossip/manager/UserDataPersistenceTest.java
index dde4b74..e1e1127 100644
--- a/gossip-base/src/test/java/org/apache/gossip/manager/UserDataPersistenceTest.java
+++ b/gossip-base/src/test/java/org/apache/gossip/manager/UserDataPersistenceTest.java
@@ -35,6 +35,8 @@
private GossipManager sameService() throws URISyntaxException {
GossipSettings settings = new GossipSettings();
+ settings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager");
+ settings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager");
return GossipManagerBuilder.newBuilder()
.cluster("a")
.uri(new URI("udp://" + "127.0.0.1" + ":" + (29000 + 1)))
diff --git a/gossip-base/src/test/java/org/apache/gossip/protocol/UnitTestProtocolManager.java b/gossip-base/src/test/java/org/apache/gossip/protocol/UnitTestProtocolManager.java
new file mode 100644
index 0000000..3d52c4a
--- /dev/null
+++ b/gossip-base/src/test/java/org/apache/gossip/protocol/UnitTestProtocolManager.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gossip.protocol;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.gossip.GossipSettings;
+import org.apache.gossip.manager.PassiveGossipConstants;
+import org.apache.gossip.model.Base;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+// doesn't serialize anything besides longs. Uses a static lookup table to read and write objects.
+public class UnitTestProtocolManager implements ProtocolManager {
+
+ // so it can be shared across gossipers. this works as long as each object has a different memory address.
+ private static final Map<Long, Base> lookup = new ConcurrentHashMap<>();
+ private final Meter meter;
+
+ public UnitTestProtocolManager(GossipSettings settings, String id, MetricRegistry registry) {
+ meter = settings.isSignMessages() ?
+ registry.meter(PassiveGossipConstants.SIGNED_MESSAGE) :
+ registry.meter(PassiveGossipConstants.UNSIGNED_MESSAGE);
+ }
+
+ private static byte[] longToBytes(long val) {
+ byte[] b = new byte[8];
+ b[7] = (byte) (val);
+ b[6] = (byte) (val >>> 8);
+ b[5] = (byte) (val >>> 16);
+ b[4] = (byte) (val >>> 24);
+ b[3] = (byte) (val >>> 32);
+ b[2] = (byte) (val >>> 40);
+ b[1] = (byte) (val >>> 48);
+ b[0] = (byte) (val >>> 56);
+ return b;
+ }
+
+ static long bytesToLong(byte[] b) {
+ return ((b[7] & 0xFFL)) +
+ ((b[6] & 0xFFL) << 8) +
+ ((b[5] & 0xFFL) << 16) +
+ ((b[4] & 0xFFL) << 24) +
+ ((b[3] & 0xFFL) << 32) +
+ ((b[2] & 0xFFL) << 40) +
+ ((b[1] & 0xFFL) << 48) +
+ (((long) b[0]) << 56);
+ }
+
+ @Override
+ public byte[] write(Base message) throws IOException {
+ long hashCode = System.identityHashCode(message);
+ byte[] serialized = longToBytes(hashCode);
+ lookup.put(hashCode, message);
+ meter.mark();
+ return serialized;
+ }
+
+ @Override
+ public Base read(byte[] buf) throws IOException {
+ long hashCode = bytesToLong(buf);
+ return lookup.remove(hashCode);
+ }
+}
diff --git a/gossip-base/src/test/java/org/apache/gossip/transport/UnitTestTransportManager.java b/gossip-base/src/test/java/org/apache/gossip/transport/UnitTestTransportManager.java
new file mode 100644
index 0000000..a783b75
--- /dev/null
+++ b/gossip-base/src/test/java/org/apache/gossip/transport/UnitTestTransportManager.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gossip.transport;
+
+import org.apache.gossip.manager.GossipCore;
+import org.apache.gossip.manager.GossipManager;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** Only use in unit tests! */
+public class UnitTestTransportManager extends AbstractTransportManager {
+
+ private static final Map<URI, UnitTestTransportManager> allManagers = new ConcurrentHashMap<>();
+
+ private final URI localEndpoint;
+ private BlockingQueue<byte[]> buffers = new ArrayBlockingQueue<byte[]>(1000);
+
+ public UnitTestTransportManager(GossipManager gossipManager, GossipCore gossipCore) {
+ super(gossipManager, gossipCore);
+ localEndpoint = gossipManager.getMyself().getUri();
+ }
+
+ @Override
+ public void send(URI endpoint, byte[] buf) throws IOException {
+ if (allManagers.containsKey(endpoint)) {
+ try {
+ allManagers.get(endpoint).buffers.put(buf);
+ } catch (InterruptedException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+
+ @Override
+ public byte[] read() throws IOException {
+ try {
+ return buffers.take();
+ } catch (InterruptedException ex) {
+ // probably not the right thing to do, but we'll see.
+ throw new IOException(ex);
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ allManagers.remove(localEndpoint);
+ super.shutdown();
+ }
+
+ @Override
+ public void startEndpoint() {
+ allManagers.put(localEndpoint, this);
+ super.startEndpoint();
+ }
+}
diff --git a/gossip-protocol-jackson/pom.xml b/gossip-protocol-jackson/pom.xml
new file mode 100644
index 0000000..067a27e
--- /dev/null
+++ b/gossip-protocol-jackson/pom.xml
@@ -0,0 +1,51 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.gossip</groupId>
+ <artifactId>gossip-parent</artifactId>
+ <version>0.1.3-incubating-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <name>Gossip Jackson Protocol</name>
+ <artifactId>gossip-protocol-jackson</artifactId>
+ <version>0.1.3-incubating-SNAPSHOT</version>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.gossip</groupId>
+ <artifactId>gossip-base</artifactId>
+ <version>0.1.3-incubating-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.gossip</groupId>
+ <artifactId>gossip-base</artifactId>
+ <version>0.1.3-incubating-SNAPSHOT</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/gossip-base/src/main/java/org/apache/gossip/protocol/JacksonProtocolManager.java b/gossip-protocol-jackson/src/main/java/org/apache/gossip/protocol/json/JacksonProtocolManager.java
similarity index 97%
rename from gossip-base/src/main/java/org/apache/gossip/protocol/JacksonProtocolManager.java
rename to gossip-protocol-jackson/src/main/java/org/apache/gossip/protocol/json/JacksonProtocolManager.java
index 91ed7f9..499c5ee 100644
--- a/gossip-base/src/main/java/org/apache/gossip/protocol/JacksonProtocolManager.java
+++ b/gossip-protocol-jackson/src/main/java/org/apache/gossip/protocol/json/JacksonProtocolManager.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gossip.protocol;
+package org.apache.gossip.protocol.json;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
@@ -26,6 +26,7 @@
import org.apache.gossip.manager.PassiveGossipConstants;
import org.apache.gossip.model.Base;
import org.apache.gossip.model.SignedPayload;
+import org.apache.gossip.protocol.ProtocolManager;
import java.io.File;
import java.io.FileInputStream;
diff --git a/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/JacksonTest.java b/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/JacksonTest.java
new file mode 100644
index 0000000..bd8a949
--- /dev/null
+++ b/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/JacksonTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gossip.protocol.json;
+
+import com.codahale.metrics.MetricRegistry;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.gossip.GossipSettings;
+import org.apache.gossip.Member;
+import org.apache.gossip.crdt.OrSet;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.manager.GossipManagerBuilder;
+import org.apache.gossip.model.Base;
+import org.apache.gossip.protocol.ProtocolManager;
+import org.apache.gossip.udp.Trackable;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+
+public class JacksonTest {
+
+ private static GossipSettings simpleSettings(GossipSettings settings) {
+ settings.setPersistRingState(false);
+ settings.setPersistDataState(false);
+ settings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager");
+ settings.setProtocolManagerClass("org.apache.gossip.protocol.json.JacksonProtocolManager");
+ return settings;
+ }
+
+ private static GossipSettings withSigning(GossipSettings settings) {
+ settings.setSignMessages(true);
+ return settings;
+ }
+
+ // formerly of SignedMessageTest.
+ @Test(expected = IllegalArgumentException.class)
+ public void ifSignMustHaveKeys()
+ throws URISyntaxException, UnknownHostException, InterruptedException {
+ String cluster = UUID.randomUUID().toString();
+ GossipSettings settings = withSigning(simpleSettings(new GossipSettings()));
+ List<Member> startupMembers = new ArrayList<>();
+ URI uri = new URI("udp://" + "127.0.0.1" + ":" + (30000 + 1));
+ GossipManager gossipService = GossipManagerBuilder.newBuilder()
+ .cluster(cluster)
+ .uri(uri)
+ .id(1 + "")
+ .gossipMembers(startupMembers)
+ .gossipSettings(settings)
+ .build();
+ gossipService.init();
+ }
+
+ @Test
+ public void jacksonSerialTest() throws InterruptedException, URISyntaxException, IOException {
+ ObjectMapper objectMapper = JacksonProtocolManager.buildObjectMapper(simpleSettings(new GossipSettings()));
+
+ OrSet<Integer> i = new OrSet<Integer>(new OrSet.Builder<Integer>().add(1).remove(1));
+ String s = objectMapper.writeValueAsString(i);
+ @SuppressWarnings("unchecked")
+ OrSet<Integer> back = objectMapper.readValue(s, OrSet.class);
+ Assert.assertEquals(back, i);
+ }
+
+ @Test
+ public void testMessageEqualityAssumptions() {
+ long timeA = System.nanoTime();
+ long timeB = System.nanoTime();
+ Assert.assertNotEquals(timeA, timeB);
+
+ TestMessage messageA0 = new TestMessage(Long.toHexString(timeA));
+ TestMessage messageA1 = new TestMessage(Long.toHexString(timeA));
+ TestMessage messageB = new TestMessage(Long.toHexString(timeB));
+
+ Assert.assertEquals(messageA0, messageA1);
+ Assert.assertFalse(messageA0 == messageA1);
+ Assert.assertNotEquals(messageA0, messageB);
+ Assert.assertNotEquals(messageA1, messageB);
+ }
+
+ // ideally, we would test the serializability of every message type, but we just want to make sure this works in
+ // basic cases.
+ @Test
+ public void testMessageSerializationRoundTrip() throws Exception {
+ ProtocolManager mgr = new JacksonProtocolManager(simpleSettings(new GossipSettings()), "foo", new MetricRegistry());
+ for (int i = 0; i < 100; i++) {
+ TestMessage a = new TestMessage(Long.toHexString(System.nanoTime()));
+ byte[] bytes = mgr.write(a);
+ TestMessage b = (TestMessage) mgr.read(bytes);
+ Assert.assertFalse(a == b);
+ Assert.assertEquals(a, b);
+ Assert.assertEquals(a.getMapOfThings(), b.getMapOfThings()); // concerned about that one, so explicit check.
+ }
+ }
+}
diff --git a/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/TestMessage.java b/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/TestMessage.java
new file mode 100644
index 0000000..43032de
--- /dev/null
+++ b/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/TestMessage.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.protocol.json;
+
+import org.apache.gossip.model.Base;
+import org.apache.gossip.udp.Trackable;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/*
+ * Here is a test class for serialization. I've tried to include a lot of things in it including nested classes.
+ * Note that there are no Jackson annotations.
+ * getters and setters are the keys to making this work without the Jackson annotations.
+ */
+class TestMessage extends Base implements Trackable {
+ private String unique;
+ private String from;
+ private String uuid;
+ private String derivedField;
+ private Subclass otherThing;
+ private float floatValue;
+ private double doubleValue;
+ private Object[] arrayOfThings;
+ private Map<String, String> mapOfThings = new HashMap<>();
+
+ private TestMessage() {
+ }
+
+ TestMessage(String unique) {
+ this.unique = unique;
+ from = Integer.toHexString(unique.hashCode());
+ uuid = Integer.toHexString(from.hashCode());
+ derivedField = Integer.toHexString(uuid.hashCode());
+ otherThing = new Subclass(Integer.toHexString(derivedField.hashCode()));
+ floatValue = (float) unique.hashCode() / (float) from.hashCode();
+ doubleValue = (double) uuid.hashCode() / (double) derivedField.hashCode();
+ arrayOfThings = new Object[]{
+ this.unique, from, uuid, derivedField, otherThing, floatValue, doubleValue
+ };
+
+ String curThing = unique;
+ for (int i = 0; i < 100; i++) {
+ String key = Integer.toHexString(curThing.hashCode());
+ String value = Integer.toHexString(key.hashCode());
+ curThing = value;
+ mapOfThings.put(key, value);
+ }
+ }
+
+ @Override
+ public String getUriFrom() {
+ return from;
+ }
+
+ @Override
+ public void setUriFrom(String uriFrom) {
+ this.from = uriFrom;
+ }
+
+ @Override
+ public String getUuid() {
+ return uuid;
+ }
+
+ @Override
+ public void setUuid(String uuid) {
+ this.uuid = uuid;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof TestMessage)) return false;
+ TestMessage that = (TestMessage) o;
+ return Objects.equals(unique, that.unique) &&
+ Objects.equals(from, that.from) &&
+ Objects.equals(getUuid(), that.getUuid()) &&
+ Objects.equals(derivedField, that.derivedField) &&
+ Objects.equals(floatValue, that.floatValue) &&
+ Objects.equals(doubleValue, that.doubleValue) &&
+ Arrays.equals(arrayOfThings, that.arrayOfThings) &&
+ Objects.equals(mapOfThings, that.mapOfThings);
+ }
+
+ public String getUnique() {
+ return unique;
+ }
+
+ public void setUnique(String unique) {
+ this.unique = unique;
+ }
+
+ public String getFrom() {
+ return from;
+ }
+
+ public void setFrom(String from) {
+ this.from = from;
+ }
+
+ public String getDerivedField() {
+ return derivedField;
+ }
+
+ public void setDerivedField(String derivedField) {
+ this.derivedField = derivedField;
+ }
+
+ public Subclass getOtherThing() {
+ return otherThing;
+ }
+
+ public void setOtherThing(Subclass otherThing) {
+ this.otherThing = otherThing;
+ }
+
+ public float getFloatValue() {
+ return floatValue;
+ }
+
+ public void setFloatValue(float floatValue) {
+ this.floatValue = floatValue;
+ }
+
+ public double getDoubleValue() {
+ return doubleValue;
+ }
+
+ public void setDoubleValue(double doubleValue) {
+ this.doubleValue = doubleValue;
+ }
+
+ public Object[] getArrayOfThings() {
+ return arrayOfThings;
+ }
+
+ public void setArrayOfThings(Object[] arrayOfThings) {
+ this.arrayOfThings = arrayOfThings;
+ }
+
+ public Map<String, String> getMapOfThings() {
+ return mapOfThings;
+ }
+
+ public void setMapOfThings(Map<String, String> mapOfThings) {
+ this.mapOfThings = mapOfThings;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(unique, getUriFrom(), getUuid(), derivedField, floatValue, doubleValue, arrayOfThings, mapOfThings);
+ }
+
+ static class Subclass {
+ private String thing;
+
+ public Subclass() {
+ }
+
+ public Subclass(String thing) {
+ this.thing = thing;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof Subclass)) return false;
+ Subclass subclass = (Subclass) o;
+ return Objects.equals(thing, subclass.thing);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(thing);
+ }
+
+ public String getThing() {
+ return thing;
+ }
+ }
+}
\ No newline at end of file
diff --git a/gossip-transport-udp/pom.xml b/gossip-transport-udp/pom.xml
new file mode 100644
index 0000000..2e79b1a
--- /dev/null
+++ b/gossip-transport-udp/pom.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.gossip</groupId>
+ <artifactId>gossip-parent</artifactId>
+ <version>0.1.3-incubating-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <name>Gossip UDP Transport</name>
+ <artifactId>gossip-transport-udp</artifactId>
+ <version>0.1.3-incubating-SNAPSHOT</version>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.gossip</groupId>
+ <artifactId>gossip-base</artifactId>
+ <version>0.1.3-incubating-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/gossip-base/src/main/java/org/apache/gossip/transport/UdpTransportManager.java b/gossip-transport-udp/src/main/java/org/apache/gossip/transport/udp/UdpTransportManager.java
similarity index 96%
rename from gossip-base/src/main/java/org/apache/gossip/transport/UdpTransportManager.java
rename to gossip-transport-udp/src/main/java/org/apache/gossip/transport/udp/UdpTransportManager.java
index d80deec..3f509a6 100644
--- a/gossip-base/src/main/java/org/apache/gossip/transport/UdpTransportManager.java
+++ b/gossip-transport-udp/src/main/java/org/apache/gossip/transport/udp/UdpTransportManager.java
@@ -15,10 +15,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gossip.transport;
+package org.apache.gossip.transport.udp;
import org.apache.gossip.manager.GossipCore;
import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.transport.AbstractTransportManager;
import org.apache.log4j.Logger;
import java.io.IOException;
diff --git a/gossip-transport-udp/src/test/java/org/apache/gossip/transport/udp/UdpTransportIntegrationTest.java b/gossip-transport-udp/src/test/java/org/apache/gossip/transport/udp/UdpTransportIntegrationTest.java
new file mode 100644
index 0000000..5258374
--- /dev/null
+++ b/gossip-transport-udp/src/test/java/org/apache/gossip/transport/udp/UdpTransportIntegrationTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.transport.udp;
+
+import org.apache.gossip.GossipSettings;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+public class UdpTransportIntegrationTest {
+
+ // It's currently impossible to create a UdpTransportManager without bringing up an entire stack.
+ // This is because AbstractTransportManager creates a PassiveGossipThread (requires GossipManager,
+ // GossipCore) and also requires those same things plus a MetricsRegistry to create the
+ // ActiveGossiper.
+ // TODO: test UDPTransportManger semantics (read and write) in isolation.
+ // I've written this test to indicate the direction I want things to go.
+ // Uncomment/Fix it once the coupling issues are worked out.
+ @Test @Ignore
+ public void testRoundTrip() {
+ /*
+ GossipSettings settings0 = new GossipSettings();
+ GossipSettings settings1 = new GossipSettings();
+ UdpTransportManager mgr0 = new UdpTransportManager(settings0);
+ UdpTransportManager mgr1 = new UdpTransportManager(settings1);
+
+ mgr0.startEndpoint();
+ mgr1.startEndpoint();
+ mgr0.startActiveGossiper();
+ mgr1.startActiveGossiper();
+
+ // wait a little while for convergence
+ // perhaps there is a Mockito Whitebox way to foce members
+
+ byte[] data = new byte[] {0,1,2,3,4,5};
+ Future<byte[]> someData = asyncWaitForData(mgr1);
+ mgr0.send(toURI(settings1), data);
+
+ Assert.assertEquals(data, someData.get(1000, TimeUnit.MILLISECONDS));
+
+ mgr0.shutdown();
+ mgr1.shutdown();
+ */
+ }
+
+
+}
diff --git a/pom.xml b/pom.xml
index f9c7814..97aa409 100644
--- a/pom.xml
+++ b/pom.xml
@@ -56,6 +56,8 @@
<modules>
<module>gossip-base</module>
+ <module>gossip-transport-udp</module>
+ <module>gossip-protocol-jackson</module>
</modules>
<description>A peer to peer cluster discovery service</description>
@@ -81,6 +83,39 @@
<url>https://issues.apache.org/jira/browse/GOSSIP</url>
</issueManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-api</artifactId>
+ <version>${junit.jupiter.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-engine</artifactId>
+ <version>${junit.jupiter.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.vintage</groupId>
+ <artifactId>junit-vintage-engine</artifactId>
+ <version>${junit.vintage.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.platform</groupId>
+ <artifactId>junit-platform-runner</artifactId>
+ <version>${junit.platform.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.teknek</groupId>
+ <artifactId>tunit</artifactId>
+ <version>${tunit.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
<build>
<pluginManagement>
<plugins>