GOSSIP-46 Refactor away GossipService cleaner better class names
diff --git a/src/main/java/org/apache/gossip/GossipService.java b/src/main/java/org/apache/gossip/GossipService.java
deleted file mode 100644
index f216c33..0000000
--- a/src/main/java/org/apache/gossip/GossipService.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gossip;
-
-import com.codahale.metrics.MetricRegistry;
-import java.net.URI;
-import java.net.UnknownHostException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import com.codahale.metrics.JmxReporter;
-import org.apache.gossip.event.GossipListener;
-import org.apache.gossip.manager.GossipManager;
-import org.apache.gossip.manager.random.RandomGossipManager;
-import org.apache.gossip.model.GossipDataMessage;
-import org.apache.gossip.model.SharedGossipDataMessage;
-import org.apache.log4j.Logger;
-
-/**
- * This object represents the service which is responsible for gossiping with other gossip members.
- *
- */
-public class GossipService {
-
- public static final Logger LOGGER = Logger.getLogger(GossipService.class);
- private final JmxReporter jmxReporter;
-
- private final GossipManager gossipManager;
-
- /**
- * Constructor with the default settings.
- *
- * @throws InterruptedException
- * @throws UnknownHostException
- */
- public GossipService(StartupSettings startupSettings) throws InterruptedException,
- UnknownHostException {
- this(startupSettings.getCluster(), startupSettings.getUri()
- , startupSettings.getId(), new HashMap<String,String> (),startupSettings.getGossipMembers(),
- startupSettings.getGossipSettings(), null, new MetricRegistry());
- }
-
- /**
- * Setup the client's lists, gossiping parameters, and parse the startup config file.
- *
- * @throws InterruptedException
- * @throws UnknownHostException
- */
- public GossipService(String cluster, URI uri, String id, Map<String,String> properties,
- List<GossipMember> gossipMembers, GossipSettings settings, GossipListener listener, MetricRegistry registry)
- throws InterruptedException, UnknownHostException {
- jmxReporter = JmxReporter.forRegistry(registry).build();
- jmxReporter.start();
- gossipManager = RandomGossipManager.newBuilder()
- .withId(id)
- .cluster(cluster)
- .uri(uri)
- .settings(settings)
- .gossipMembers(gossipMembers)
- .listener(listener)
- .registry(registry)
- .properties(properties)
- .build();
- }
-
- public void start() {
- gossipManager.init();
- }
-
- public void shutdown() {
- gossipManager.shutdown();
- }
-
- public GossipManager getGossipManager() {
- return gossipManager;
- }
-
- /**
- * Gossip data in a namespace that is per-node { node-id { key, value } }
- * @param message
- * message to be gossip'ed across the cluster
- */
- public void gossipPerNodeData(GossipDataMessage message){
- gossipManager.gossipPerNodeData(message);
- }
-
- /**
- * Retrieve per-node gossip data by key
- *
- * @param nodeId
- * the id of the node that owns the data
- * @param key
- * the key in the per-node map to find the data
- * @return the value if found or null if not found or expired
- */
- public GossipDataMessage findPerNodeData(String nodeId, String key){
- return getGossipManager().findPerNodeGossipData(nodeId, key);
- }
-
- /**
- *
- * @param message
- * Shared data to gossip around the cluster
- */
- public void gossipSharedData(SharedGossipDataMessage message){
- gossipManager.gossipSharedData(message);
- }
-
- /**
- *
- * @param key
- * the key to search for
- * @return the value associated with given key
- */
- public SharedGossipDataMessage findSharedData(String key){
- return getGossipManager().findSharedGossipData(key);
- }
-
-}
diff --git a/src/main/java/org/apache/gossip/LocalGossipMember.java b/src/main/java/org/apache/gossip/LocalMember.java
similarity index 92%
rename from src/main/java/org/apache/gossip/LocalGossipMember.java
rename to src/main/java/org/apache/gossip/LocalMember.java
index 05874f5..db6e3f7 100644
--- a/src/main/java/org/apache/gossip/LocalGossipMember.java
+++ b/src/main/java/org/apache/gossip/LocalMember.java
@@ -27,7 +27,7 @@
* in the local list of gossip members.
*
*/
-public class LocalGossipMember extends GossipMember {
+public class LocalMember extends Member {
/** The failure detector for this member */
private transient FailureDetector detector;
@@ -40,13 +40,13 @@
* @param heartbeat
* The current heartbeat
*/
- public LocalGossipMember(String clusterName, URI uri, String id,
+ public LocalMember(String clusterName, URI uri, String id,
long heartbeat, Map<String,String> properties, int windowSize, int minSamples, String distribution) {
super(clusterName, uri, id, heartbeat, properties );
detector = new FailureDetector(this, minSamples, windowSize, distribution);
}
- protected LocalGossipMember(){
+ protected LocalMember(){
}
diff --git a/src/main/java/org/apache/gossip/GossipMember.java b/src/main/java/org/apache/gossip/Member.java
similarity index 89%
rename from src/main/java/org/apache/gossip/GossipMember.java
rename to src/main/java/org/apache/gossip/Member.java
index 703ac55..d04a7b6 100644
--- a/src/main/java/org/apache/gossip/GossipMember.java
+++ b/src/main/java/org/apache/gossip/Member.java
@@ -25,7 +25,7 @@
* A abstract class representing a gossip member.
*
*/
-public abstract class GossipMember implements Comparable<GossipMember> {
+public abstract class Member implements Comparable<Member> {
protected URI uri;
@@ -56,7 +56,7 @@
* @param id
* An id that may be replaced after contact
*/
- public GossipMember(String clusterName, URI uri, String id, long heartbeat, Map<String,String> properties) {
+ public Member(String clusterName, URI uri, String id, long heartbeat, Map<String,String> properties) {
this.clusterName = clusterName;
this.id = id;
this.heartbeat = heartbeat;
@@ -64,7 +64,7 @@
this.properties = properties;
}
- protected GossipMember(){}
+ protected Member(){}
/**
* Get the name of the cluster the member belongs to.
*
@@ -151,16 +151,16 @@
System.err.println("equals(): obj is null.");
return false;
}
- if (!(obj instanceof GossipMember)) {
+ if (!(obj instanceof Member)) {
System.err.println("equals(): obj is not of type GossipMember.");
return false;
}
// The object is the same of they both have the same address (hostname and port).
- return computeAddress().equals(((LocalGossipMember) obj).computeAddress())
- && getClusterName().equals(((LocalGossipMember) obj).getClusterName());
+ return computeAddress().equals(((LocalMember) obj).computeAddress())
+ && getClusterName().equals(((LocalMember) obj).getClusterName());
}
- public int compareTo(GossipMember other) {
+ public int compareTo(Member other) {
return this.computeAddress().compareTo(other.computeAddress());
}
}
diff --git a/src/main/java/org/apache/gossip/RemoteGossipMember.java b/src/main/java/org/apache/gossip/RemoteMember.java
similarity index 84%
rename from src/main/java/org/apache/gossip/RemoteGossipMember.java
rename to src/main/java/org/apache/gossip/RemoteMember.java
index e3f6620..6b42da2 100644
--- a/src/main/java/org/apache/gossip/RemoteGossipMember.java
+++ b/src/main/java/org/apache/gossip/RemoteMember.java
@@ -26,7 +26,7 @@
* member.
*
*/
-public class RemoteGossipMember extends GossipMember {
+public class RemoteMember extends Member {
/**
* Constructor.
@@ -36,11 +36,11 @@
* @param heartbeat
* The current heartbeat
*/
- public RemoteGossipMember(String clusterName, URI uri, String id, long heartbeat, Map<String,String> properties) {
+ public RemoteMember(String clusterName, URI uri, String id, long heartbeat, Map<String,String> properties) {
super(clusterName, uri, id, heartbeat, properties);
}
- public RemoteGossipMember(String clusterName, URI uri, String id) {
+ public RemoteMember(String clusterName, URI uri, String id) {
super(clusterName, uri, id, System.nanoTime(), new HashMap<String,String>());
}
diff --git a/src/main/java/org/apache/gossip/StartupSettings.java b/src/main/java/org/apache/gossip/StartupSettings.java
index ab5f764..17eaaf2 100644
--- a/src/main/java/org/apache/gossip/StartupSettings.java
+++ b/src/main/java/org/apache/gossip/StartupSettings.java
@@ -53,7 +53,7 @@
private final GossipSettings gossipSettings;
/** The list with gossip members to start with. */
- private final List<GossipMember> gossipMembers;
+ private final List<Member> gossipMembers;
/**
* Constructor.
@@ -135,7 +135,7 @@
* @param member
* The member to add.
*/
- public void addGossipMember(GossipMember member) {
+ public void addGossipMember(Member member) {
gossipMembers.add(member);
}
@@ -144,7 +144,7 @@
*
* @return The gossip members.
*/
- public List<GossipMember> getGossipMembers() {
+ public List<Member> getGossipMembers() {
return gossipMembers;
}
@@ -195,7 +195,7 @@
while (it.hasNext()){
JsonNode child = it.next();
URI uri3 = new URI(child.get("uri").textValue());
- RemoteGossipMember member = new RemoteGossipMember(child.get("cluster").asText(),
+ RemoteMember member = new RemoteMember(child.get("cluster").asText(),
uri3, "", 0, new HashMap<String,String>());
settings.addGossipMember(member);
configMembersDetails += member.computeAddress();
diff --git a/src/main/java/org/apache/gossip/accrual/FailureDetector.java b/src/main/java/org/apache/gossip/accrual/FailureDetector.java
index 10d66a9..22e73db 100644
--- a/src/main/java/org/apache/gossip/accrual/FailureDetector.java
+++ b/src/main/java/org/apache/gossip/accrual/FailureDetector.java
@@ -21,7 +21,7 @@
import org.apache.commons.math.distribution.ExponentialDistributionImpl;
import org.apache.commons.math.distribution.NormalDistributionImpl;
import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
-import org.apache.gossip.LocalGossipMember;
+import org.apache.gossip.LocalMember;
import org.apache.log4j.Logger;
public class FailureDetector {
@@ -30,10 +30,10 @@
private final DescriptiveStatistics descriptiveStatistics;
private final long minimumSamples;
private volatile long latestHeartbeatMs = -1;
- private final LocalGossipMember parent;
+ private final LocalMember parent;
private final String distribution;
- public FailureDetector(LocalGossipMember parent, long minimumSamples, int windowSize, String distribution){
+ public FailureDetector(LocalMember parent, long minimumSamples, int windowSize, String distribution){
this.parent = parent;
descriptiveStatistics = new DescriptiveStatistics(windowSize);
this.minimumSamples = minimumSamples;
diff --git a/src/main/java/org/apache/gossip/event/GossipListener.java b/src/main/java/org/apache/gossip/event/GossipListener.java
index 2e882f6..9b33dab 100644
--- a/src/main/java/org/apache/gossip/event/GossipListener.java
+++ b/src/main/java/org/apache/gossip/event/GossipListener.java
@@ -17,8 +17,8 @@
*/
package org.apache.gossip.event;
-import org.apache.gossip.GossipMember;
+import org.apache.gossip.Member;
public interface GossipListener {
- void gossipEvent(GossipMember member, GossipState state);
+ void gossipEvent(Member member, GossipState state);
}
diff --git a/src/main/java/org/apache/gossip/examples/GossipExample.java b/src/main/java/org/apache/gossip/examples/GossipExample.java
deleted file mode 100644
index 8236d46..0000000
--- a/src/main/java/org/apache/gossip/examples/GossipExample.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gossip.examples;
-
-import com.codahale.metrics.MetricRegistry;
-import java.net.InetAddress;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-import org.apache.gossip.GossipMember;
-import org.apache.gossip.GossipService;
-import org.apache.gossip.GossipSettings;
-import org.apache.gossip.RemoteGossipMember;
-
-/**
- * This class is an example of how one could use the gossip service. Here we start multiple gossip
- * clients on this host as specified in the config file.
- *
- * @author harmenw
- */
-public class GossipExample extends Thread {
- /** The number of clients to start. */
- private static final int NUMBER_OF_CLIENTS = 4;
-
- /**
- * @param args
- */
- public static void main(String[] args) {
- new GossipExample();
- }
-
- /**
- * Constructor. This will start the this thread.
- */
- public GossipExample() {
- start();
- }
-
- /**
- * @see java.lang.Thread#run()
- */
- public void run() {
- try {
- GossipSettings settings = new GossipSettings();
- List<GossipService> clients = new ArrayList<>();
- String myIpAddress = InetAddress.getLocalHost().getHostAddress();
- String cluster = "My Gossip Cluster";
-
- // Create the gossip members and put them in a list and give them a port number starting with
- // 2000.
- List<GossipMember> startupMembers = new ArrayList<>();
- for (int i = 0; i < NUMBER_OF_CLIENTS; ++i) {
- URI u;
- try {
- u = new URI("udp://" + myIpAddress + ":" + (2000 + i));
- } catch (URISyntaxException e) {
- throw new RuntimeException(e);
- }
- startupMembers.add(new RemoteGossipMember(cluster, u, "", 0, new HashMap<String,String>()));
- }
-
- // Lets start the gossip clients.
- // Start the clients, waiting cleaning-interval + 1 second between them which will show the
- // dead list handling.
- for (GossipMember member : startupMembers) {
- GossipService gossipService = new GossipService(cluster, member.getUri(), "", new HashMap<String,String>(),
- startupMembers, settings, null, new MetricRegistry());
- clients.add(gossipService);
- gossipService.start();
- sleep(settings.getCleanupInterval() + 1000);
- }
-
- // After starting all gossip clients, first wait 10 seconds and then shut them down.
- sleep(10000);
- System.err.println("Going to shutdown all services...");
- // Since they all run in the same virtual machine and share the same executor, if one is
- // shutdown they will all stop.
- clients.get(0).shutdown();
-
- } catch (UnknownHostException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-}
diff --git a/src/main/java/org/apache/gossip/examples/StandAloneDatacenterAndRack.java b/src/main/java/org/apache/gossip/examples/StandAloneDatacenterAndRack.java
index dfeabd7..357e316 100644
--- a/src/main/java/org/apache/gossip/examples/StandAloneDatacenterAndRack.java
+++ b/src/main/java/org/apache/gossip/examples/StandAloneDatacenterAndRack.java
@@ -24,12 +24,11 @@
import java.util.HashMap;
import java.util.Map;
-import org.apache.gossip.GossipService;
import org.apache.gossip.GossipSettings;
-import org.apache.gossip.RemoteGossipMember;
+import org.apache.gossip.RemoteMember;
import org.apache.gossip.manager.DatacenterRackAwareActiveGossiper;
-
-import com.codahale.metrics.MetricRegistry;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.manager.GossipManagerBuilder;
public class StandAloneDatacenterAndRack {
@@ -43,18 +42,21 @@
gossipProps.put("sameRackGossipIntervalMs", "2000");
gossipProps.put("differentDatacenterGossipIntervalMs", "10000");
s.setActiveGossipProperties(gossipProps);
-
-
Map<String, String> props = new HashMap<>();
props.put(DatacenterRackAwareActiveGossiper.DATACENTER, args[4]);
props.put(DatacenterRackAwareActiveGossiper.RACK, args[5]);
- GossipService gossipService = new GossipService("mycluster", URI.create(args[0]), args[1],
- props, Arrays.asList(new RemoteGossipMember("mycluster", URI.create(args[2]), args[3])),
- s, (a, b) -> { }, new MetricRegistry());
- gossipService.start();
+ GossipManager manager = GossipManagerBuilder.newBuilder()
+ .cluster("mycluster")
+ .uri(URI.create(args[0]))
+ .id(args[1])
+ .gossipSettings(s)
+ .gossipMembers(Arrays.asList(new RemoteMember("mycluster", URI.create(args[2]), args[3])))
+ .properties(props)
+ .build();
+ manager.init();
while (true){
- System.out.println("Live: " + gossipService.getGossipManager().getLiveMembers());
- System.out.println("Dead: " + gossipService.getGossipManager().getDeadMembers());
+ System.out.println("Live: " + manager.getLiveMembers());
+ System.out.println("Dead: " + manager.getDeadMembers());
Thread.sleep(2000);
}
}
diff --git a/src/main/java/org/apache/gossip/examples/StandAloneNode.java b/src/main/java/org/apache/gossip/examples/StandAloneNode.java
index 3564943..b38865e 100644
--- a/src/main/java/org/apache/gossip/examples/StandAloneNode.java
+++ b/src/main/java/org/apache/gossip/examples/StandAloneNode.java
@@ -17,15 +17,13 @@
*/
package org.apache.gossip.examples;
-import com.codahale.metrics.MetricRegistry;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.Arrays;
-import java.util.HashMap;
-
-import org.apache.gossip.GossipService;
import org.apache.gossip.GossipSettings;
-import org.apache.gossip.RemoteGossipMember;
+import org.apache.gossip.RemoteMember;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.manager.GossipManagerBuilder;
public class StandAloneNode {
public static void main (String [] args) throws UnknownHostException, InterruptedException{
@@ -33,12 +31,17 @@
s.setWindowSize(10);
s.setConvictThreshold(1.0);
s.setGossipInterval(10);
- GossipService gossipService = new GossipService("mycluster", URI.create(args[0]), args[1], new HashMap<String, String>(),
- Arrays.asList( new RemoteGossipMember("mycluster", URI.create(args[2]), args[3])), s, (a,b) -> {}, new MetricRegistry());
- gossipService.start();
+ GossipManager gossipService = GossipManagerBuilder.newBuilder()
+ .cluster("mycluster")
+ .uri(URI.create(args[0]))
+ .id(args[1])
+ .gossipMembers(Arrays.asList( new RemoteMember("mycluster", URI.create(args[2]), args[3])))
+ .gossipSettings(s)
+ .build();
+ gossipService.init();
while (true){
- System.out.println( "Live: " + gossipService.getGossipManager().getLiveMembers());
- System.out.println( "Dead: " + gossipService.getGossipManager().getDeadMembers());
+ System.out.println("Live: " + gossipService.getLiveMembers());
+ System.out.println("Dead: " + gossipService.getDeadMembers());
Thread.sleep(2000);
}
}
diff --git a/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java b/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java
index d1c1751..00f1279 100644
--- a/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java
+++ b/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java
@@ -17,20 +17,17 @@
*/
package org.apache.gossip.examples;
-import com.codahale.metrics.MetricRegistry;
-
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.Arrays;
-import java.util.HashMap;
-
-import org.apache.gossip.GossipService;
import org.apache.gossip.GossipSettings;
-import org.apache.gossip.RemoteGossipMember;
+import org.apache.gossip.RemoteMember;
import org.apache.gossip.crdt.OrSet;
-import org.apache.gossip.model.SharedGossipDataMessage;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.manager.GossipManagerBuilder;
+import org.apache.gossip.model.SharedDataMessage;
public class StandAloneNodeCrdtOrSet {
public static void main (String [] args) throws InterruptedException, IOException{
@@ -38,17 +35,22 @@
s.setWindowSize(10);
s.setConvictThreshold(1.0);
s.setGossipInterval(10);
- GossipService gossipService = new GossipService("mycluster", URI.create(args[0]), args[1], new HashMap<String, String>(),
- Arrays.asList( new RemoteGossipMember("mycluster", URI.create(args[2]), args[3])), s, (a,b) -> {}, new MetricRegistry());
- gossipService.start();
+ GossipManager gossipService = GossipManagerBuilder.newBuilder()
+ .cluster("mycluster")
+ .uri(URI.create(args[0]))
+ .id(args[1])
+ .gossipMembers(Arrays.asList( new RemoteMember("mycluster", URI.create(args[2]), args[3])))
+ .gossipSettings(s)
+ .build();
+ gossipService.init();
new Thread(() -> {
while (true){
- System.out.println("Live: " + gossipService.getGossipManager().getLiveMembers());
- System.out.println("Dead: " + gossipService.getGossipManager().getDeadMembers());
- System.out.println("---------- " + (gossipService.getGossipManager().findCrdt("abc") == null ? "":
- gossipService.getGossipManager().findCrdt("abc").value()));
- System.out.println("********** " + gossipService.getGossipManager().findCrdt("abc"));
+ System.out.println("Live: " + gossipService.getLiveMembers());
+ System.out.println("Dead: " + gossipService.getDeadMembers());
+ System.out.println("---------- " + (gossipService.findCrdt("abc") == null ? "":
+ gossipService.findCrdt("abc").value()));
+ System.out.println("********** " + gossipService.findCrdt("abc"));
try {
Thread.sleep(2000);
} catch (Exception e) {}
@@ -70,22 +72,23 @@
}
}
- private static void removeData(String val, GossipService gossipService){
- OrSet<String> s = (OrSet<String>) gossipService.getGossipManager().findCrdt("abc");
- SharedGossipDataMessage m = new SharedGossipDataMessage();
+ private static void removeData(String val, GossipManager gossipService){
+ @SuppressWarnings("unchecked")
+ OrSet<String> s = (OrSet<String>) gossipService.findCrdt("abc");
+ SharedDataMessage m = new SharedDataMessage();
m.setExpireAt(Long.MAX_VALUE);
m.setKey("abc");
m.setPayload(new OrSet<String>(s , new OrSet.Builder<String>().remove(val)));
m.setTimestamp(System.currentTimeMillis());
- gossipService.getGossipManager().merge(m);
+ gossipService.merge(m);
}
- private static void addData(String val, GossipService gossipService){
- SharedGossipDataMessage m = new SharedGossipDataMessage();
+ private static void addData(String val, GossipManager gossipService){
+ SharedDataMessage m = new SharedDataMessage();
m.setExpireAt(Long.MAX_VALUE);
m.setKey("abc");
m.setPayload(new OrSet<String>(val));
m.setTimestamp(System.currentTimeMillis());
- gossipService.getGossipManager().merge(m);
+ gossipService.merge(m);
}
}
diff --git a/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java b/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java
index 9fea30b..b73550e 100644
--- a/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java
+++ b/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java
@@ -24,16 +24,16 @@
import java.util.concurrent.ConcurrentHashMap;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
-import org.apache.gossip.LocalGossipMember;
+import org.apache.gossip.LocalMember;
import org.apache.gossip.model.ActiveGossipOk;
-import org.apache.gossip.model.GossipDataMessage;
-import org.apache.gossip.model.GossipMember;
+import org.apache.gossip.model.PerNodeDataMessage;
+import org.apache.gossip.model.Member;
import org.apache.gossip.model.Response;
-import org.apache.gossip.model.SharedGossipDataMessage;
+import org.apache.gossip.model.SharedDataMessage;
import org.apache.gossip.model.ShutdownMessage;
import org.apache.gossip.udp.UdpActiveGossipMessage;
-import org.apache.gossip.udp.UdpGossipDataMessage;
-import org.apache.gossip.udp.UdpSharedGossipDataMessage;
+import org.apache.gossip.udp.UdpPerNodeDataMessage;
+import org.apache.gossip.udp.UdpSharedDataMessage;
import org.apache.log4j.Logger;
import static com.codahale.metrics.MetricRegistry.name;
@@ -69,7 +69,7 @@
}
- public final void sendShutdownMessage(LocalGossipMember me, LocalGossipMember target){
+ public final void sendShutdownMessage(LocalMember me, LocalMember target){
if (target == null){
return;
}
@@ -79,13 +79,13 @@
gossipCore.sendOneWay(m, target.getUri());
}
- public final void sendSharedData(LocalGossipMember me, LocalGossipMember member){
+ public final void sendSharedData(LocalMember me, LocalMember member){
if (member == null){
return;
}
long startTime = System.currentTimeMillis();
- for (Entry<String, SharedGossipDataMessage> innerEntry : gossipCore.getSharedData().entrySet()){
- UdpSharedGossipDataMessage message = new UdpSharedGossipDataMessage();
+ for (Entry<String, SharedDataMessage> innerEntry : gossipCore.getSharedData().entrySet()){
+ UdpSharedDataMessage message = new UdpSharedDataMessage();
message.setUuid(UUID.randomUUID().toString());
message.setUriFrom(me.getId());
message.setExpireAt(innerEntry.getValue().getExpireAt());
@@ -98,14 +98,14 @@
sharedDataHistogram.update(System.currentTimeMillis() - startTime);
}
- public final void sendPerNodeData(LocalGossipMember me, LocalGossipMember member){
+ public final void sendPerNodeData(LocalMember me, LocalMember member){
if (member == null){
return;
}
long startTime = System.currentTimeMillis();
- for (Entry<String, ConcurrentHashMap<String, GossipDataMessage>> entry : gossipCore.getPerNodeData().entrySet()){
- for (Entry<String, GossipDataMessage> innerEntry : entry.getValue().entrySet()){
- UdpGossipDataMessage message = new UdpGossipDataMessage();
+ for (Entry<String, ConcurrentHashMap<String, PerNodeDataMessage>> entry : gossipCore.getPerNodeData().entrySet()){
+ for (Entry<String, PerNodeDataMessage> innerEntry : entry.getValue().entrySet()){
+ UdpPerNodeDataMessage message = new UdpPerNodeDataMessage();
message.setUuid(UUID.randomUUID().toString());
message.setUriFrom(me.getId());
message.setExpireAt(innerEntry.getValue().getExpireAt());
@@ -122,7 +122,7 @@
/**
* Performs the sending of the membership list, after we have incremented our own heartbeat.
*/
- protected void sendMembershipList(LocalGossipMember me, LocalGossipMember member) {
+ protected void sendMembershipList(LocalMember me, LocalMember member) {
if (member == null){
return;
}
@@ -132,7 +132,7 @@
message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString());
message.setUuid(UUID.randomUUID().toString());
message.getMembers().add(convert(me));
- for (LocalGossipMember other : gossipManager.getMembers().keySet()) {
+ for (LocalMember other : gossipManager.getMembers().keySet()) {
message.getMembers().add(convert(other));
}
Response r = gossipCore.send(message, member.getUri());
@@ -144,8 +144,8 @@
sendMembershipHistorgram.update(System.currentTimeMillis() - startTime);
}
- protected final GossipMember convert(LocalGossipMember member){
- GossipMember gm = new GossipMember();
+ protected final Member convert(LocalMember member){
+ Member gm = new Member();
gm.setCluster(member.getClusterName());
gm.setHeartbeat(member.getHeartbeat());
gm.setUri(member.getUri().toASCIIString());
@@ -160,8 +160,8 @@
* An immutable list
* @return The chosen LocalGossipMember to gossip with.
*/
- protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList) {
- LocalGossipMember member = null;
+ protected LocalMember selectPartner(List<LocalMember> memberList) {
+ LocalMember member = null;
if (memberList.size() > 0) {
int randomNeighborIndex = random.nextInt(memberList.size());
member = memberList.get(randomNeighborIndex);
diff --git a/src/main/java/org/apache/gossip/manager/DataReaper.java b/src/main/java/org/apache/gossip/manager/DataReaper.java
index f165239..8175a1b 100644
--- a/src/main/java/org/apache/gossip/manager/DataReaper.java
+++ b/src/main/java/org/apache/gossip/manager/DataReaper.java
@@ -23,8 +23,8 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import org.apache.gossip.model.GossipDataMessage;
-import org.apache.gossip.model.SharedGossipDataMessage;
+import org.apache.gossip.model.PerNodeDataMessage;
+import org.apache.gossip.model.SharedDataMessage;
/**
* We wish to periodically sweep user data and remove entries past their timestamp. This
@@ -53,7 +53,7 @@
}
void runSharedOnce(){
- for (Entry<String, SharedGossipDataMessage> entry : gossipCore.getSharedData().entrySet()){
+ for (Entry<String, SharedDataMessage> entry : gossipCore.getSharedData().entrySet()){
if (entry.getValue().getExpireAt() < clock.currentTimeMillis()){
gossipCore.getSharedData().remove(entry.getKey(), entry.getValue());
}
@@ -61,13 +61,13 @@
}
void runPerNodeOnce(){
- for (Entry<String, ConcurrentHashMap<String, GossipDataMessage>> node : gossipCore.getPerNodeData().entrySet()){
+ for (Entry<String, ConcurrentHashMap<String, PerNodeDataMessage>> node : gossipCore.getPerNodeData().entrySet()){
reapData(node.getValue());
}
}
- void reapData(ConcurrentHashMap<String, GossipDataMessage> concurrentHashMap){
- for (Entry<String, GossipDataMessage> entry : concurrentHashMap.entrySet()){
+ void reapData(ConcurrentHashMap<String, PerNodeDataMessage> concurrentHashMap){
+ for (Entry<String, PerNodeDataMessage> entry : concurrentHashMap.entrySet()){
if (entry.getValue().getExpireAt() < clock.currentTimeMillis()){
concurrentHashMap.remove(entry.getKey(), entry.getValue());
}
diff --git a/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java b/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java
index c66e332..2f489a2 100644
--- a/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java
+++ b/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java
@@ -27,7 +27,7 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import org.apache.gossip.LocalGossipMember;
+import org.apache.gossip.LocalMember;
import com.codahale.metrics.MetricRegistry;
@@ -130,14 +130,14 @@
sendMembershipList(gossipManager.getMyself(), selectPartner(gossipManager.getDeadMembers()));
}
- private List<LocalGossipMember> differentDataCenter(){
+ private List<LocalMember> differentDataCenter(){
String myDc = gossipManager.getMyself().getProperties().get(DATACENTER);
String rack = gossipManager.getMyself().getProperties().get(RACK);
if (myDc == null|| rack == null){
return Collections.emptyList();
}
- List<LocalGossipMember> notMyDc = new ArrayList<LocalGossipMember>(10);
- for (LocalGossipMember i : gossipManager.getLiveMembers()){
+ List<LocalMember> notMyDc = new ArrayList<LocalMember>(10);
+ for (LocalMember i : gossipManager.getLiveMembers()){
if (!myDc.equals(i.getProperties().get(DATACENTER))){
notMyDc.add(i);
}
@@ -145,14 +145,14 @@
return notMyDc;
}
- private List<LocalGossipMember> sameDatacenterDifferentRack(){
+ private List<LocalMember> sameDatacenterDifferentRack(){
String myDc = gossipManager.getMyself().getProperties().get(DATACENTER);
String rack = gossipManager.getMyself().getProperties().get(RACK);
if (myDc == null|| rack == null){
return Collections.emptyList();
}
- List<LocalGossipMember> notMyDc = new ArrayList<LocalGossipMember>(10);
- for (LocalGossipMember i : gossipManager.getLiveMembers()){
+ List<LocalMember> notMyDc = new ArrayList<LocalMember>(10);
+ for (LocalMember i : gossipManager.getLiveMembers()){
if (myDc.equals(i.getProperties().get(DATACENTER)) && !rack.equals(i.getProperties().get(RACK))){
notMyDc.add(i);
}
@@ -160,14 +160,14 @@
return notMyDc;
}
- private List<LocalGossipMember> sameRackNodes(){
+ private List<LocalMember> sameRackNodes(){
String myDc = gossipManager.getMyself().getProperties().get(DATACENTER);
String rack = gossipManager.getMyself().getProperties().get(RACK);
if (myDc == null|| rack == null){
return Collections.emptyList();
}
- List<LocalGossipMember> sameDcAndRack = new ArrayList<LocalGossipMember>(10);
- for (LocalGossipMember i : gossipManager.getLiveMembers()){
+ List<LocalMember> sameDcAndRack = new ArrayList<LocalMember>(10);
+ for (LocalMember i : gossipManager.getLiveMembers()){
if (myDc.equals(i.getProperties().get(DATACENTER))
&& rack.equals(i.getProperties().get(RACK))){
sameDcAndRack.add(i);
@@ -177,7 +177,7 @@
}
private void sendToSameRackMember() {
- LocalGossipMember i = selectPartner(sameRackNodes());
+ LocalMember i = selectPartner(sameRackNodes());
sendMembershipList(gossipManager.getMyself(), i);
}
@@ -235,7 +235,7 @@
* sends an optimistic shutdown message to several clusters nodes
*/
protected void sendShutdownMessage(){
- List<LocalGossipMember> l = gossipManager.getLiveMembers();
+ List<LocalMember> l = gossipManager.getLiveMembers();
int sendTo = l.size() < 3 ? 1 : l.size() / 3;
for (int i = 0; i < sendTo; i++) {
threadService.execute(() -> sendShutdownMessage(gossipManager.getMyself(), selectPartner(l)));
diff --git a/src/main/java/org/apache/gossip/manager/GossipCore.java b/src/main/java/org/apache/gossip/manager/GossipCore.java
index a24b125..e3dcb21 100644
--- a/src/main/java/org/apache/gossip/manager/GossipCore.java
+++ b/src/main/java/org/apache/gossip/manager/GossipCore.java
@@ -20,9 +20,9 @@
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
-import org.apache.gossip.GossipMember;
-import org.apache.gossip.LocalGossipMember;
-import org.apache.gossip.RemoteGossipMember;
+import org.apache.gossip.Member;
+import org.apache.gossip.LocalMember;
+import org.apache.gossip.RemoteMember;
import org.apache.gossip.crdt.Crdt;
import org.apache.gossip.event.GossipState;
import org.apache.gossip.model.*;
@@ -49,8 +49,8 @@
private final GossipManager gossipManager;
private ConcurrentHashMap<String, Base> requests;
private ThreadPoolExecutor service;
- private final ConcurrentHashMap<String, ConcurrentHashMap<String, GossipDataMessage>> perNodeData;
- private final ConcurrentHashMap<String, SharedGossipDataMessage> sharedData;
+ private final ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>> perNodeData;
+ private final ConcurrentHashMap<String, SharedDataMessage> sharedData;
private final BlockingQueue<Runnable> workQueue;
private final PKCS8EncodedKeySpec privKeySpec;
private final PrivateKey privKey;
@@ -113,14 +113,14 @@
}
@SuppressWarnings({ "unchecked", "rawtypes" })
- public void addSharedData(SharedGossipDataMessage message) {
+ public void addSharedData(SharedDataMessage message) {
while (true){
- SharedGossipDataMessage previous = sharedData.putIfAbsent(message.getKey(), message);
+ SharedDataMessage previous = sharedData.putIfAbsent(message.getKey(), message);
if (previous == null){
return;
}
if (message.getPayload() instanceof Crdt){
- SharedGossipDataMessage merged = new SharedGossipDataMessage();
+ SharedDataMessage merged = new SharedDataMessage();
merged.setExpireAt(message.getExpireAt());
merged.setKey(message.getKey());
merged.setNodeId(message.getNodeId());
@@ -144,12 +144,12 @@
}
}
- public void addPerNodeData(GossipDataMessage message){
- ConcurrentHashMap<String,GossipDataMessage> nodeMap = new ConcurrentHashMap<>();
+ public void addPerNodeData(PerNodeDataMessage message){
+ ConcurrentHashMap<String,PerNodeDataMessage> nodeMap = new ConcurrentHashMap<>();
nodeMap.put(message.getKey(), message);
nodeMap = perNodeData.putIfAbsent(message.getNodeId(), nodeMap);
if (nodeMap != null){
- GossipDataMessage current = nodeMap.get(message.getKey());
+ PerNodeDataMessage current = nodeMap.get(message.getKey());
if (current == null){
nodeMap.putIfAbsent(message.getKey(), message);
} else {
@@ -160,11 +160,11 @@
}
}
- public ConcurrentHashMap<String, ConcurrentHashMap<String, GossipDataMessage>> getPerNodeData(){
+ public ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>> getPerNodeData(){
return perNodeData;
}
- public ConcurrentHashMap<String, SharedGossipDataMessage> getSharedData() {
+ public ConcurrentHashMap<String, SharedDataMessage> getSharedData() {
return sharedData;
}
@@ -314,12 +314,12 @@
* @param remoteList
*
*/
- public void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember,
- List<GossipMember> remoteList) {
+ public void mergeLists(GossipManager gossipManager, RemoteMember senderMember,
+ List<Member> remoteList) {
if (LOGGER.isDebugEnabled()){
debugState(senderMember, remoteList);
}
- for (LocalGossipMember i : gossipManager.getDeadMembers()) {
+ for (LocalMember i : gossipManager.getDeadMembers()) {
if (i.getId().equals(senderMember.getId())) {
LOGGER.debug(gossipManager.getMyself() + " contacted by dead member " + senderMember.getUri());
i.recordHeartbeat(senderMember.getHeartbeat());
@@ -327,11 +327,11 @@
//TODO consider forcing an UP here
}
}
- for (GossipMember remoteMember : remoteList) {
+ for (Member remoteMember : remoteList) {
if (remoteMember.getId().equals(gossipManager.getMyself().getId())) {
continue;
}
- LocalGossipMember aNewMember = new LocalGossipMember(remoteMember.getClusterName(),
+ LocalMember aNewMember = new LocalMember(remoteMember.getClusterName(),
remoteMember.getUri(),
remoteMember.getId(),
remoteMember.getHeartbeat(),
@@ -342,7 +342,7 @@
aNewMember.recordHeartbeat(remoteMember.getHeartbeat());
Object result = gossipManager.getMembers().putIfAbsent(aNewMember, GossipState.UP);
if (result != null){
- for (Entry<LocalGossipMember, GossipState> localMember : gossipManager.getMembers().entrySet()){
+ for (Entry<LocalMember, GossipState> localMember : gossipManager.getMembers().entrySet()){
if (localMember.getKey().getId().equals(remoteMember.getId())){
localMember.getKey().recordHeartbeat(remoteMember.getHeartbeat());
localMember.getKey().setHeartbeat(remoteMember.getHeartbeat());
@@ -356,8 +356,8 @@
}
}
- private void debugState(RemoteGossipMember senderMember,
- List<GossipMember> remoteList){
+ private void debugState(RemoteMember senderMember,
+ List<Member> remoteList){
LOGGER.warn(
"-----------------------\n" +
"Me " + gossipManager.getMyself() + "\n" +
@@ -369,13 +369,13 @@
}
@SuppressWarnings("rawtypes")
- public Crdt merge(SharedGossipDataMessage message) {
+ public Crdt merge(SharedDataMessage message) {
for (;;){
- SharedGossipDataMessage previous = sharedData.putIfAbsent(message.getKey(), message);
+ SharedDataMessage previous = sharedData.putIfAbsent(message.getKey(), message);
if (previous == null){
return (Crdt) message.getPayload();
}
- SharedGossipDataMessage copy = new SharedGossipDataMessage();
+ SharedDataMessage copy = new SharedDataMessage();
copy.setExpireAt(message.getExpireAt());
copy.setKey(message.getKey());
copy.setNodeId(message.getNodeId());
diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java
index 4b28f2f..ba8517b 100644
--- a/src/main/java/org/apache/gossip/manager/GossipManager.java
+++ b/src/main/java/org/apache/gossip/manager/GossipManager.java
@@ -19,16 +19,16 @@
import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.gossip.GossipMember;
+import org.apache.gossip.Member;
import org.apache.gossip.GossipSettings;
-import org.apache.gossip.LocalGossipMember;
+import org.apache.gossip.LocalMember;
import org.apache.gossip.crdt.Crdt;
import org.apache.gossip.event.GossipListener;
import org.apache.gossip.event.GossipState;
import org.apache.gossip.manager.handlers.MessageInvoker;
import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread;
-import org.apache.gossip.model.GossipDataMessage;
-import org.apache.gossip.model.SharedGossipDataMessage;
+import org.apache.gossip.model.PerNodeDataMessage;
+import org.apache.gossip.model.SharedDataMessage;
import org.apache.gossip.model.ShutdownMessage;
import org.apache.log4j.Logger;
@@ -44,13 +44,12 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
-
public abstract class GossipManager {
public static final Logger LOGGER = Logger.getLogger(GossipManager.class);
- private final ConcurrentSkipListMap<LocalGossipMember, GossipState> members;
- private final LocalGossipMember me;
+ private final ConcurrentSkipListMap<LocalMember, GossipState> members;
+ private final LocalMember me;
private final GossipSettings settings;
private final AtomicBoolean gossipServiceRunning;
private final GossipListener listener;
@@ -70,19 +69,19 @@
public GossipManager(String cluster,
URI uri, String id, Map<String, String> properties, GossipSettings settings,
- List<GossipMember> gossipMembers, GossipListener listener, MetricRegistry registry,
+ List<Member> gossipMembers, GossipListener listener, MetricRegistry registry,
ObjectMapper objectMapper, MessageInvoker messageInvoker) {
this.settings = settings;
this.messageInvoker = messageInvoker;
clock = new SystemClock();
- me = new LocalGossipMember(cluster, uri, id, clock.nanoTime(), properties,
+ me = new LocalMember(cluster, uri, id, clock.nanoTime(), properties,
settings.getWindowSize(), settings.getMinimumSamples(), settings.getDistribution());
gossipCore = new GossipCore(this, registry);
dataReaper = new DataReaper(gossipCore, clock);
members = new ConcurrentSkipListMap<>();
- for (GossipMember startupMember : gossipMembers) {
+ for (Member startupMember : gossipMembers) {
if (!startupMember.equals(me)) {
- LocalGossipMember member = new LocalGossipMember(startupMember.getClusterName(),
+ LocalMember member = new LocalMember(startupMember.getClusterName(),
startupMember.getUri(), startupMember.getId(),
clock.nanoTime(), startupMember.getProperties(), settings.getWindowSize(),
settings.getMinimumSamples(), settings.getDistribution());
@@ -106,7 +105,7 @@
return messageInvoker;
}
- public ConcurrentSkipListMap<LocalGossipMember, GossipState> getMembers() {
+ public ConcurrentSkipListMap<LocalMember, GossipState> getMembers() {
return members;
}
@@ -117,7 +116,7 @@
/**
* @return a read only list of members found in the DOWN state.
*/
- public List<LocalGossipMember> getDeadMembers() {
+ public List<LocalMember> getDeadMembers() {
return Collections.unmodifiableList(
members.entrySet()
.stream()
@@ -129,7 +128,7 @@
*
* @return a read only list of members found in the UP state
*/
- public List<LocalGossipMember> getLiveMembers() {
+ public List<LocalMember> getLiveMembers() {
return Collections.unmodifiableList(
members.entrySet()
.stream()
@@ -137,7 +136,7 @@
.map(Entry::getKey).collect(Collectors.toList()));
}
- public LocalGossipMember getMyself() {
+ public LocalMember getMyself() {
return me;
}
@@ -164,7 +163,7 @@
scheduledServiced.scheduleAtFixedRate(userDataState, 60, 60, TimeUnit.SECONDS);
scheduledServiced.scheduleAtFixedRate(() -> {
try {
- for (Entry<LocalGossipMember, GossipState> entry : members.entrySet()) {
+ for (Entry<LocalMember, GossipState> entry : members.entrySet()) {
boolean userDown = processOptomisticShutdown(entry);
if (userDown)
continue;
@@ -205,8 +204,8 @@
* @param l member to consider
* @return true if node forced down
*/
- public boolean processOptomisticShutdown(Entry<LocalGossipMember, GossipState> l){
- GossipDataMessage m = findPerNodeGossipData(l.getKey().getId(), ShutdownMessage.PER_NODE_KEY);
+ public boolean processOptomisticShutdown(Entry<LocalMember, GossipState> l){
+ PerNodeDataMessage m = findPerNodeGossipData(l.getKey().getId(), ShutdownMessage.PER_NODE_KEY);
if (m == null){
return false;
}
@@ -224,8 +223,8 @@
}
private void readSavedRingState() {
- for (LocalGossipMember l : ringState.readFromDisk()){
- LocalGossipMember member = new LocalGossipMember(l.getClusterName(),
+ for (LocalMember l : ringState.readFromDisk()){
+ LocalMember member = new LocalMember(l.getClusterName(),
l.getUri(), l.getId(),
clock.nanoTime(), l.getProperties(), settings.getWindowSize(),
settings.getMinimumSamples(), settings.getDistribution());
@@ -234,12 +233,12 @@
}
private void readSavedDataState() {
- for (Entry<String, ConcurrentHashMap<String, GossipDataMessage>> l : userDataState.readPerNodeFromDisk().entrySet()){
- for (Entry<String, GossipDataMessage> j : l.getValue().entrySet()){
+ for (Entry<String, ConcurrentHashMap<String, PerNodeDataMessage>> l : userDataState.readPerNodeFromDisk().entrySet()){
+ for (Entry<String, PerNodeDataMessage> j : l.getValue().entrySet()){
gossipCore.addPerNodeData(j.getValue());
}
}
- for (Entry<String, SharedGossipDataMessage> l: userDataState.readSharedDataFromDisk().entrySet()){
+ for (Entry<String, SharedDataMessage> l: userDataState.readSharedDataFromDisk().entrySet()){
gossipCore.addSharedData(l.getValue());
}
}
@@ -276,7 +275,7 @@
scheduledServiced.shutdownNow();
}
- public void gossipPerNodeData(GossipDataMessage message){
+ public void gossipPerNodeData(PerNodeDataMessage message){
Objects.nonNull(message.getKey());
Objects.nonNull(message.getTimestamp());
Objects.nonNull(message.getPayload());
@@ -284,7 +283,7 @@
gossipCore.addPerNodeData(message);
}
- public void gossipSharedData(SharedGossipDataMessage message){
+ public void gossipSharedData(SharedDataMessage message){
Objects.nonNull(message.getKey());
Objects.nonNull(message.getTimestamp());
Objects.nonNull(message.getPayload());
@@ -295,7 +294,7 @@
@SuppressWarnings("rawtypes")
public Crdt findCrdt(String key){
- SharedGossipDataMessage l = gossipCore.getSharedData().get(key);
+ SharedDataMessage l = gossipCore.getSharedData().get(key);
if (l == null){
return null;
}
@@ -307,7 +306,7 @@
}
@SuppressWarnings("rawtypes")
- public Crdt merge(SharedGossipDataMessage message){
+ public Crdt merge(SharedDataMessage message){
Objects.nonNull(message.getKey());
Objects.nonNull(message.getTimestamp());
Objects.nonNull(message.getPayload());
@@ -318,12 +317,12 @@
return gossipCore.merge(message);
}
- public GossipDataMessage findPerNodeGossipData(String nodeId, String key){
- ConcurrentHashMap<String, GossipDataMessage> j = gossipCore.getPerNodeData().get(nodeId);
+ public PerNodeDataMessage findPerNodeGossipData(String nodeId, String key){
+ ConcurrentHashMap<String, PerNodeDataMessage> j = gossipCore.getPerNodeData().get(nodeId);
if (j == null){
return null;
} else {
- GossipDataMessage l = j.get(key);
+ PerNodeDataMessage l = j.get(key);
if (l == null){
return null;
}
@@ -334,8 +333,8 @@
}
}
- public SharedGossipDataMessage findSharedGossipData(String key){
- SharedGossipDataMessage l = gossipCore.getSharedData().get(key);
+ public SharedDataMessage findSharedGossipData(String key){
+ SharedDataMessage l = gossipCore.getSharedData().get(key);
if (l == null){
return null;
}
diff --git a/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java
similarity index 77%
rename from src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java
rename to src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java
index 3ac237a..b87045b 100644
--- a/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java
+++ b/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java
@@ -15,16 +15,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gossip.manager.random;
+package org.apache.gossip.manager;
import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.core.JsonGenerator.Feature;
import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.gossip.GossipMember;
+import org.apache.gossip.Member;
import org.apache.gossip.GossipSettings;
+import org.apache.gossip.StartupSettings;
import org.apache.gossip.crdt.CrdtModule;
import org.apache.gossip.event.GossipListener;
-import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.manager.handlers.DefaultMessageInvoker;
import org.apache.gossip.manager.handlers.MessageInvoker;
@@ -34,7 +34,7 @@
import java.util.List;
import java.util.Map;
-public class RandomGossipManager extends GossipManager {
+public class GossipManagerBuilder {
public static ManagerBuilder newBuilder() {
return new ManagerBuilder();
@@ -45,7 +45,7 @@
private URI uri;
private String id;
private GossipSettings settings;
- private List<GossipMember> gossipMembers;
+ private List<Member> gossipMembers;
private GossipListener listener;
private MetricRegistry registry;
private Map<String,String> properties;
@@ -70,17 +70,26 @@
return this;
}
- public ManagerBuilder withId(String id) {
+ public ManagerBuilder id(String id) {
this.id = id;
return this;
}
- public ManagerBuilder settings(GossipSettings settings) {
+ public ManagerBuilder gossipSettings(GossipSettings settings) {
this.settings = settings;
return this;
}
+
+ public ManagerBuilder startupSettings(StartupSettings startupSettings) {
+ this.cluster = startupSettings.getCluster();
+ this.id = startupSettings.getId();
+ this.settings = startupSettings.getGossipSettings();
+ this.gossipMembers = startupSettings.getGossipMembers();
+ this.uri = startupSettings.getUri();
+ return this;
+ }
- public ManagerBuilder gossipMembers(List<GossipMember> members) {
+ public ManagerBuilder gossipMembers(List<Member> members) {
this.gossipMembers = members;
return this;
}
@@ -110,12 +119,14 @@
return this;
}
- public RandomGossipManager build() {
+ public GossipManager build() {
checkArgument(id != null, "You must specify an id");
checkArgument(cluster != null, "You must specify a cluster name");
checkArgument(settings != null, "You must specify gossip settings");
checkArgument(uri != null, "You must specify a uri");
- checkArgument(registry != null, "You must specify a MetricRegistry");
+ if (registry == null){
+ registry = new MetricRegistry();
+ }
if (properties == null){
properties = new HashMap<String,String>();
}
@@ -133,13 +144,9 @@
}
if (messageInvoker == null) {
messageInvoker = new DefaultMessageInvoker();
- }
- return new RandomGossipManager(cluster, uri, id, properties, settings, gossipMembers, listener, registry, objectMapper, messageInvoker);
+ }
+ return new GossipManager(cluster, uri, id, properties, settings, gossipMembers, listener, registry, objectMapper, messageInvoker) {} ;
}
}
- private RandomGossipManager(String cluster, URI uri, String id, Map<String,String> properties, GossipSettings settings,
- List<GossipMember> gossipMembers, GossipListener listener, MetricRegistry registry, ObjectMapper objectMapper, MessageInvoker messageInvoker) {
- super(cluster, uri, id, properties, settings, gossipMembers, listener, registry, objectMapper, messageInvoker);
- }
}
diff --git a/src/main/java/org/apache/gossip/manager/RingStatePersister.java b/src/main/java/org/apache/gossip/manager/RingStatePersister.java
index 6f724e0..7e42562 100644
--- a/src/main/java/org/apache/gossip/manager/RingStatePersister.java
+++ b/src/main/java/org/apache/gossip/manager/RingStatePersister.java
@@ -26,7 +26,7 @@
import java.util.List;
import java.util.NavigableSet;
-import org.apache.gossip.LocalGossipMember;
+import org.apache.gossip.LocalMember;
import org.apache.log4j.Logger;
public class RingStatePersister implements Runnable {
@@ -52,7 +52,7 @@
if (!parent.getSettings().isPersistRingState()){
return;
}
- NavigableSet<LocalGossipMember> i = parent.getMembers().keySet();
+ NavigableSet<LocalMember> i = parent.getMembers().keySet();
try (FileOutputStream fos = new FileOutputStream(computeTarget())){
parent.getObjectMapper().writeValue(fos, i);
} catch (IOException e) {
@@ -61,7 +61,7 @@
}
@SuppressWarnings("unchecked")
- List<LocalGossipMember> readFromDisk(){
+ List<LocalMember> readFromDisk(){
if (!parent.getSettings().isPersistRingState()){
return Collections.emptyList();
}
diff --git a/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java b/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java
index 839d796..e47fe2a 100644
--- a/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java
+++ b/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java
@@ -25,7 +25,7 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import org.apache.gossip.LocalGossipMember;
+import org.apache.gossip.LocalMember;
import com.codahale.metrics.MetricRegistry;
@@ -88,12 +88,12 @@
}
protected void sendToALiveMember(){
- LocalGossipMember member = selectPartner(gossipManager.getLiveMembers());
+ LocalMember member = selectPartner(gossipManager.getLiveMembers());
sendMembershipList(gossipManager.getMyself(), member);
}
protected void sendToDeadMember(){
- LocalGossipMember member = selectPartner(gossipManager.getDeadMembers());
+ LocalMember member = selectPartner(gossipManager.getDeadMembers());
sendMembershipList(gossipManager.getMyself(), member);
}
@@ -101,7 +101,7 @@
* sends an optimistic shutdown message to several clusters nodes
*/
protected void sendShutdownMessage(){
- List<LocalGossipMember> l = gossipManager.getLiveMembers();
+ List<LocalMember> l = gossipManager.getLiveMembers();
int sendTo = l.size() < 3 ? 1 : l.size() / 2;
for (int i = 0; i < sendTo; i++) {
threadService.execute(() -> sendShutdownMessage(gossipManager.getMyself(), selectPartner(l)));
diff --git a/src/main/java/org/apache/gossip/manager/UserDataPersister.java b/src/main/java/org/apache/gossip/manager/UserDataPersister.java
index 4a8a415..3b9eafa 100644
--- a/src/main/java/org/apache/gossip/manager/UserDataPersister.java
+++ b/src/main/java/org/apache/gossip/manager/UserDataPersister.java
@@ -23,8 +23,8 @@
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
-import org.apache.gossip.model.GossipDataMessage;
-import org.apache.gossip.model.SharedGossipDataMessage;
+import org.apache.gossip.model.PerNodeDataMessage;
+import org.apache.gossip.model.SharedDataMessage;
import org.apache.log4j.Logger;
public class UserDataPersister implements Runnable {
@@ -49,16 +49,16 @@
}
@SuppressWarnings("unchecked")
- ConcurrentHashMap<String, ConcurrentHashMap<String, GossipDataMessage>> readPerNodeFromDisk(){
+ ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>> readPerNodeFromDisk(){
if (!parent.getSettings().isPersistDataState()){
- return new ConcurrentHashMap<String, ConcurrentHashMap<String, GossipDataMessage>>();
+ return new ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>>();
}
try (FileInputStream fos = new FileInputStream(computePerNodeTarget())){
return parent.getObjectMapper().readValue(fos, ConcurrentHashMap.class);
} catch (IOException e) {
LOGGER.debug(e);
}
- return new ConcurrentHashMap<String, ConcurrentHashMap<String, GossipDataMessage>>();
+ return new ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>>();
}
void writePerNodeToDisk(){
@@ -84,16 +84,16 @@
}
@SuppressWarnings("unchecked")
- ConcurrentHashMap<String, SharedGossipDataMessage> readSharedDataFromDisk(){
+ ConcurrentHashMap<String, SharedDataMessage> readSharedDataFromDisk(){
if (!parent.getSettings().isPersistRingState()){
- return new ConcurrentHashMap<String, SharedGossipDataMessage>();
+ return new ConcurrentHashMap<String, SharedDataMessage>();
}
try (FileInputStream fos = new FileInputStream(computeSharedTarget())){
return parent.getObjectMapper().readValue(fos, ConcurrentHashMap.class);
} catch (IOException e) {
LOGGER.debug(e);
}
- return new ConcurrentHashMap<String, SharedGossipDataMessage>();
+ return new ConcurrentHashMap<String, SharedDataMessage>();
}
/**
diff --git a/src/main/java/org/apache/gossip/manager/handlers/ActiveGossipMessageHandler.java b/src/main/java/org/apache/gossip/manager/handlers/ActiveGossipMessageHandler.java
index 54aa40c..f5e568e 100644
--- a/src/main/java/org/apache/gossip/manager/handlers/ActiveGossipMessageHandler.java
+++ b/src/main/java/org/apache/gossip/manager/handlers/ActiveGossipMessageHandler.java
@@ -17,8 +17,8 @@
*/
package org.apache.gossip.manager.handlers;
-import org.apache.gossip.GossipMember;
-import org.apache.gossip.RemoteGossipMember;
+import org.apache.gossip.Member;
+import org.apache.gossip.RemoteMember;
import org.apache.gossip.manager.GossipCore;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.model.Base;
@@ -34,8 +34,8 @@
public class ActiveGossipMessageHandler implements MessageHandler {
@Override
public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
- List<GossipMember> remoteGossipMembers = new ArrayList<>();
- RemoteGossipMember senderMember = null;
+ List<Member> remoteGossipMembers = new ArrayList<>();
+ RemoteMember senderMember = null;
UdpActiveGossipMessage activeGossipMessage = (UdpActiveGossipMessage) base;
for (int i = 0; i < activeGossipMessage.getMembers().size(); i++) {
URI u;
@@ -45,7 +45,7 @@
GossipCore.LOGGER.debug("Gossip message with faulty URI", e);
continue;
}
- RemoteGossipMember member = new RemoteGossipMember(
+ RemoteMember member = new RemoteMember(
activeGossipMessage.getMembers().get(i).getCluster(),
u,
activeGossipMessage.getMembers().get(i).getId(),
diff --git a/src/main/java/org/apache/gossip/manager/handlers/DefaultMessageInvoker.java b/src/main/java/org/apache/gossip/manager/handlers/DefaultMessageInvoker.java
index 034691d..5b78ce3 100644
--- a/src/main/java/org/apache/gossip/manager/handlers/DefaultMessageInvoker.java
+++ b/src/main/java/org/apache/gossip/manager/handlers/DefaultMessageInvoker.java
@@ -29,8 +29,8 @@
mic = new MessageInvokerCombiner();
mic.add(new SimpleMessageInvoker(Response.class, new ResponseHandler()));
mic.add(new SimpleMessageInvoker(ShutdownMessage.class, new ShutdownMessageHandler()));
- mic.add(new SimpleMessageInvoker(GossipDataMessage.class, new GossipDataMessageHandler()));
- mic.add(new SimpleMessageInvoker(SharedGossipDataMessage.class, new SharedGossipDataMessageHandler()));
+ mic.add(new SimpleMessageInvoker(PerNodeDataMessage.class, new PerNodeDataMessageHandler()));
+ mic.add(new SimpleMessageInvoker(SharedDataMessage.class, new SharedDataMessageHandler()));
mic.add(new SimpleMessageInvoker(ActiveGossipMessage.class, new ActiveGossipMessageHandler()));
}
diff --git a/src/main/java/org/apache/gossip/manager/handlers/GossipDataMessageHandler.java b/src/main/java/org/apache/gossip/manager/handlers/PerNodeDataMessageHandler.java
similarity index 85%
rename from src/main/java/org/apache/gossip/manager/handlers/GossipDataMessageHandler.java
rename to src/main/java/org/apache/gossip/manager/handlers/PerNodeDataMessageHandler.java
index edf2579..b3a785e 100644
--- a/src/main/java/org/apache/gossip/manager/handlers/GossipDataMessageHandler.java
+++ b/src/main/java/org/apache/gossip/manager/handlers/PerNodeDataMessageHandler.java
@@ -20,12 +20,12 @@
import org.apache.gossip.manager.GossipCore;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.model.Base;
-import org.apache.gossip.udp.UdpGossipDataMessage;
+import org.apache.gossip.udp.UdpPerNodeDataMessage;
-public class GossipDataMessageHandler implements MessageHandler {
+public class PerNodeDataMessageHandler implements MessageHandler {
@Override
public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
- UdpGossipDataMessage message = (UdpGossipDataMessage) base;
+ UdpPerNodeDataMessage message = (UdpPerNodeDataMessage) base;
gossipCore.addPerNodeData(message);
}
}
diff --git a/src/main/java/org/apache/gossip/manager/handlers/SharedGossipDataMessageHandler.java b/src/main/java/org/apache/gossip/manager/handlers/SharedDataMessageHandler.java
similarity index 84%
rename from src/main/java/org/apache/gossip/manager/handlers/SharedGossipDataMessageHandler.java
rename to src/main/java/org/apache/gossip/manager/handlers/SharedDataMessageHandler.java
index e9d5343..89ca4a0 100644
--- a/src/main/java/org/apache/gossip/manager/handlers/SharedGossipDataMessageHandler.java
+++ b/src/main/java/org/apache/gossip/manager/handlers/SharedDataMessageHandler.java
@@ -20,12 +20,12 @@
import org.apache.gossip.manager.GossipCore;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.model.Base;
-import org.apache.gossip.udp.UdpSharedGossipDataMessage;
+import org.apache.gossip.udp.UdpSharedDataMessage;
-public class SharedGossipDataMessageHandler implements MessageHandler{
+public class SharedDataMessageHandler implements MessageHandler{
@Override
public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
- UdpSharedGossipDataMessage message = (UdpSharedGossipDataMessage) base;
+ UdpSharedDataMessage message = (UdpSharedDataMessage) base;
gossipCore.addSharedData(message);
}
}
diff --git a/src/main/java/org/apache/gossip/manager/handlers/ShutdownMessageHandler.java b/src/main/java/org/apache/gossip/manager/handlers/ShutdownMessageHandler.java
index c4adea2..a40c7a1 100644
--- a/src/main/java/org/apache/gossip/manager/handlers/ShutdownMessageHandler.java
+++ b/src/main/java/org/apache/gossip/manager/handlers/ShutdownMessageHandler.java
@@ -20,14 +20,14 @@
import org.apache.gossip.manager.GossipCore;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.model.Base;
-import org.apache.gossip.model.GossipDataMessage;
+import org.apache.gossip.model.PerNodeDataMessage;
import org.apache.gossip.model.ShutdownMessage;
public class ShutdownMessageHandler implements MessageHandler {
@Override
public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
ShutdownMessage s = (ShutdownMessage) base;
- GossipDataMessage m = new GossipDataMessage();
+ PerNodeDataMessage m = new PerNodeDataMessage();
m.setKey(ShutdownMessage.PER_NODE_KEY);
m.setNodeId(s.getNodeId());
m.setPayload(base);
diff --git a/src/main/java/org/apache/gossip/model/ActiveGossipMessage.java b/src/main/java/org/apache/gossip/model/ActiveGossipMessage.java
index 438f42a..a3c45b8 100644
--- a/src/main/java/org/apache/gossip/model/ActiveGossipMessage.java
+++ b/src/main/java/org/apache/gossip/model/ActiveGossipMessage.java
@@ -22,17 +22,17 @@
public class ActiveGossipMessage extends Base {
- private List<GossipMember> members = new ArrayList<>();
+ private List<Member> members = new ArrayList<>();
public ActiveGossipMessage(){
}
- public List<GossipMember> getMembers() {
+ public List<Member> getMembers() {
return members;
}
- public void setMembers(List<GossipMember> members) {
+ public void setMembers(List<Member> members) {
this.members = members;
}
diff --git a/src/main/java/org/apache/gossip/model/Base.java b/src/main/java/org/apache/gossip/model/Base.java
index 4551f2a..1b66310 100644
--- a/src/main/java/org/apache/gossip/model/Base.java
+++ b/src/main/java/org/apache/gossip/model/Base.java
@@ -19,9 +19,9 @@
import org.apache.gossip.udp.UdpActiveGossipMessage;
import org.apache.gossip.udp.UdpActiveGossipOk;
-import org.apache.gossip.udp.UdpGossipDataMessage;
+import org.apache.gossip.udp.UdpPerNodeDataMessage;
import org.apache.gossip.udp.UdpNotAMemberFault;
-import org.apache.gossip.udp.UdpSharedGossipDataMessage;
+import org.apache.gossip.udp.UdpSharedDataMessage;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.JsonSubTypes;
@@ -39,10 +39,10 @@
@Type(value = UdpActiveGossipOk.class, name = "UdpActiveGossipOk"),
@Type(value = UdpActiveGossipMessage.class, name = "UdpActiveGossipMessage"),
@Type(value = UdpNotAMemberFault.class, name = "UdpNotAMemberFault"),
- @Type(value = GossipDataMessage.class, name = "GossipDataMessage"),
- @Type(value = UdpGossipDataMessage.class, name = "UdpGossipDataMessage"),
- @Type(value = SharedGossipDataMessage.class, name = "SharedGossipDataMessage"),
- @Type(value = UdpSharedGossipDataMessage.class, name = "UdpSharedGossipDataMessage")
+ @Type(value = PerNodeDataMessage.class, name = "PerNodeDataMessage"),
+ @Type(value = UdpPerNodeDataMessage.class, name = "UdpPerNodeDataMessage"),
+ @Type(value = SharedDataMessage.class, name = "SharedDataMessage"),
+ @Type(value = UdpSharedDataMessage.class, name = "UdpSharedDataMessage")
})
public class Base {
diff --git a/src/main/java/org/apache/gossip/model/GossipMember.java b/src/main/java/org/apache/gossip/model/Member.java
similarity index 89%
rename from src/main/java/org/apache/gossip/model/GossipMember.java
rename to src/main/java/org/apache/gossip/model/Member.java
index a318776..d86aad8 100644
--- a/src/main/java/org/apache/gossip/model/GossipMember.java
+++ b/src/main/java/org/apache/gossip/model/Member.java
@@ -19,7 +19,7 @@
import java.util.Map;
-public class GossipMember {
+public class Member {
private String cluster;
private String uri;
@@ -27,11 +27,11 @@
private Long heartbeat;
private Map<String,String> properties;
- public GossipMember(){
+ public Member(){
}
- public GossipMember(String cluster, String uri, String id, Long heartbeat){
+ public Member(String cluster, String uri, String id, Long heartbeat){
this.cluster = cluster;
this.uri = uri;
this.id = id;
@@ -80,7 +80,7 @@
@Override
public String toString() {
- return "GossipMember [cluster=" + cluster + ", uri=" + uri + ", id=" + id + ", heartbeat="
+ return "Member [cluster=" + cluster + ", uri=" + uri + ", id=" + id + ", heartbeat="
+ heartbeat + ", properties=" + properties + "]";
}
diff --git a/src/main/java/org/apache/gossip/model/GossipDataMessage.java b/src/main/java/org/apache/gossip/model/PerNodeDataMessage.java
similarity index 97%
rename from src/main/java/org/apache/gossip/model/GossipDataMessage.java
rename to src/main/java/org/apache/gossip/model/PerNodeDataMessage.java
index e9aae61..2d1cdef 100644
--- a/src/main/java/org/apache/gossip/model/GossipDataMessage.java
+++ b/src/main/java/org/apache/gossip/model/PerNodeDataMessage.java
@@ -17,7 +17,7 @@
*/
package org.apache.gossip.model;
-public class GossipDataMessage extends Base {
+public class PerNodeDataMessage extends Base {
private String nodeId;
private String key;
diff --git a/src/main/java/org/apache/gossip/model/SharedGossipDataMessage.java b/src/main/java/org/apache/gossip/model/SharedDataMessage.java
similarity index 97%
rename from src/main/java/org/apache/gossip/model/SharedGossipDataMessage.java
rename to src/main/java/org/apache/gossip/model/SharedDataMessage.java
index dc80f05..e423be8 100644
--- a/src/main/java/org/apache/gossip/model/SharedGossipDataMessage.java
+++ b/src/main/java/org/apache/gossip/model/SharedDataMessage.java
@@ -17,7 +17,7 @@
*/
package org.apache.gossip.model;
-public class SharedGossipDataMessage extends Base {
+public class SharedDataMessage extends Base {
private String nodeId;
private String key;
diff --git a/src/main/java/org/apache/gossip/udp/UdpGossipDataMessage.java b/src/main/java/org/apache/gossip/udp/UdpPerNodeDataMessage.java
similarity index 90%
rename from src/main/java/org/apache/gossip/udp/UdpGossipDataMessage.java
rename to src/main/java/org/apache/gossip/udp/UdpPerNodeDataMessage.java
index f2042a5..6eb170a 100644
--- a/src/main/java/org/apache/gossip/udp/UdpGossipDataMessage.java
+++ b/src/main/java/org/apache/gossip/udp/UdpPerNodeDataMessage.java
@@ -17,9 +17,9 @@
*/
package org.apache.gossip.udp;
-import org.apache.gossip.model.GossipDataMessage;
+import org.apache.gossip.model.PerNodeDataMessage;
-public class UdpGossipDataMessage extends GossipDataMessage implements Trackable {
+public class UdpPerNodeDataMessage extends PerNodeDataMessage implements Trackable {
private String uriFrom;
private String uuid;
diff --git a/src/main/java/org/apache/gossip/udp/UdpSharedGossipDataMessage.java b/src/main/java/org/apache/gossip/udp/UdpSharedDataMessage.java
similarity index 90%
rename from src/main/java/org/apache/gossip/udp/UdpSharedGossipDataMessage.java
rename to src/main/java/org/apache/gossip/udp/UdpSharedDataMessage.java
index 6020328..1658503 100644
--- a/src/main/java/org/apache/gossip/udp/UdpSharedGossipDataMessage.java
+++ b/src/main/java/org/apache/gossip/udp/UdpSharedDataMessage.java
@@ -17,9 +17,9 @@
*/
package org.apache.gossip.udp;
-import org.apache.gossip.model.SharedGossipDataMessage;
+import org.apache.gossip.model.SharedDataMessage;
-public class UdpSharedGossipDataMessage extends SharedGossipDataMessage implements Trackable {
+public class UdpSharedDataMessage extends SharedDataMessage implements Trackable {
private String uriFrom;
private String uuid;
diff --git a/src/main/java/org/apache/gossip/manager/handlers/GossipDataMessageHandler.java b/src/test/java/org/apache/gossip/AbstractIntegrationBase.java
similarity index 60%
copy from src/main/java/org/apache/gossip/manager/handlers/GossipDataMessageHandler.java
copy to src/test/java/org/apache/gossip/AbstractIntegrationBase.java
index edf2579..896157f 100644
--- a/src/main/java/org/apache/gossip/manager/handlers/GossipDataMessageHandler.java
+++ b/src/test/java/org/apache/gossip/AbstractIntegrationBase.java
@@ -15,17 +15,36 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gossip.manager.handlers;
-import org.apache.gossip.manager.GossipCore;
+package org.apache.gossip;
+
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.gossip.manager.GossipManager;
-import org.apache.gossip.model.Base;
-import org.apache.gossip.udp.UdpGossipDataMessage;
+import org.junit.After;
+import org.junit.Before;
-public class GossipDataMessageHandler implements MessageHandler {
- @Override
- public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
- UdpGossipDataMessage message = (UdpGossipDataMessage) base;
- gossipCore.addPerNodeData(message);
+public abstract class AbstractIntegrationBase {
+
+ List <GossipManager> nodes = new ArrayList<GossipManager>();
+
+ public void register(GossipManager manager){
+ nodes.add(manager);
}
+
+ @Before
+ public void before(){
+ nodes = new ArrayList<GossipManager>();
+ }
+
+ @After
+ public void after(){
+ for (GossipManager node: nodes){
+ if (node !=null){
+ node.shutdown();
+ }
+ }
+ }
+
}
diff --git a/src/test/java/org/apache/gossip/DataTest.java b/src/test/java/org/apache/gossip/DataTest.java
index 3892e9b..147702d 100644
--- a/src/test/java/org/apache/gossip/DataTest.java
+++ b/src/test/java/org/apache/gossip/DataTest.java
@@ -17,26 +17,26 @@
*/
package org.apache.gossip;
-import com.codahale.metrics.MetricRegistry;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.gossip.crdt.GrowOnlySet;
import org.apache.gossip.crdt.OrSet;
-import org.apache.gossip.model.GossipDataMessage;
-import org.apache.gossip.model.SharedGossipDataMessage;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.manager.GossipManagerBuilder;
+import org.apache.gossip.model.PerNodeDataMessage;
+import org.apache.gossip.model.SharedDataMessage;
import org.junit.Test;
import io.teknek.tunit.TUnit;
-public class DataTest {
+public class DataTest extends AbstractIntegrationBase {
private String orSetKey = "cror";
@@ -47,25 +47,25 @@
settings.setPersistDataState(false);
String cluster = UUID.randomUUID().toString();
int seedNodes = 1;
- List<GossipMember> startupMembers = new ArrayList<>();
+ List<Member> startupMembers = new ArrayList<>();
for (int i = 1; i < seedNodes+1; ++i) {
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
- startupMembers.add(new RemoteGossipMember(cluster, uri, i + ""));
+ startupMembers.add(new RemoteMember(cluster, uri, i + ""));
}
- final List<GossipService> clients = new ArrayList<>();
+ final List<GossipManager> clients = new ArrayList<>();
final int clusterMembers = 2;
- for (int i = 1; i < clusterMembers+1; ++i) {
+ for (int i = 1; i < clusterMembers + 1; ++i) {
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
- GossipService gossipService = new GossipService(cluster, uri, i + "",
- new HashMap<String,String>(), startupMembers, settings,
- (a,b) -> {}, new MetricRegistry());
+ GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri)
+ .id(i + "").gossipMembers(startupMembers).gossipSettings(settings).build();
clients.add(gossipService);
- gossipService.start();
+ gossipService.init();
+ register(gossipService);
}
TUnit.assertThat(() -> {
int total = 0;
for (int i = 0; i < clusterMembers; ++i) {
- total += clients.get(i).getGossipManager().getLiveMembers().size();
+ total += clients.get(i).getLiveMembers().size();
}
return total;
}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2);
@@ -73,7 +73,7 @@
clients.get(0).gossipSharedData(sharedMsg());
TUnit.assertThat(()-> {
- GossipDataMessage x = clients.get(1).findPerNodeData(1 + "", "a");
+ PerNodeDataMessage x = clients.get(1).findPerNodeGossipData(1 + "", "a");
if (x == null)
return "";
else
@@ -81,7 +81,7 @@
}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("b");
TUnit.assertThat(() -> {
- SharedGossipDataMessage x = clients.get(1).findSharedData("a");
+ SharedDataMessage x = clients.get(1).findSharedGossipData("a");
if (x == null)
return "";
else
@@ -95,71 +95,67 @@
assertThatOrSetIsMerged(clients);
dropIt(clients);
assertThatOrSetDelIsMerged(clients);
-
- for (int i = 0; i < clusterMembers; ++i) {
- clients.get(i).shutdown();
- }
}
- private void givenOrs(List<GossipService> clients) {
+ private void givenOrs(List<GossipManager> clients) {
{
- SharedGossipDataMessage d = new SharedGossipDataMessage();
+ SharedDataMessage d = new SharedDataMessage();
d.setKey(orSetKey);
d.setPayload(new OrSet<String>("1", "2"));
d.setExpireAt(Long.MAX_VALUE);
d.setTimestamp(System.currentTimeMillis());
- clients.get(0).getGossipManager().merge(d);
+ clients.get(0).merge(d);
}
{
- SharedGossipDataMessage d = new SharedGossipDataMessage();
+ SharedDataMessage d = new SharedDataMessage();
d.setKey(orSetKey);
d.setPayload(new OrSet<String>("3", "4"));
d.setExpireAt(Long.MAX_VALUE);
d.setTimestamp(System.currentTimeMillis());
- clients.get(1).getGossipManager().merge(d);
+ clients.get(1).merge(d);
}
}
- private void dropIt(List<GossipService> clients) {
+ private void dropIt(List<GossipManager> clients) {
@SuppressWarnings("unchecked")
- OrSet<String> o = (OrSet<String>) clients.get(0).getGossipManager().findCrdt(orSetKey);
+ OrSet<String> o = (OrSet<String>) clients.get(0).findCrdt(orSetKey);
OrSet<String> o2 = new OrSet<String>(o, new OrSet.Builder<String>().remove("3"));
- SharedGossipDataMessage d = new SharedGossipDataMessage();
+ SharedDataMessage d = new SharedDataMessage();
d.setKey(orSetKey);
d.setPayload(o2);
d.setExpireAt(Long.MAX_VALUE);
d.setTimestamp(System.currentTimeMillis());
- clients.get(0).getGossipManager().merge(d);
+ clients.get(0).merge(d);
}
- private void assertThatOrSetIsMerged(final List<GossipService> clients){
+ private void assertThatOrSetIsMerged(final List<GossipManager> clients){
TUnit.assertThat(() -> {
- return clients.get(0).getGossipManager().findCrdt(orSetKey).value();
+ return clients.get(0).findCrdt(orSetKey).value();
}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(new OrSet<String>("1", "2", "3", "4").value());
TUnit.assertThat(() -> {
- return clients.get(1).getGossipManager().findCrdt(orSetKey).value();
+ return clients.get(1).findCrdt(orSetKey).value();
}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(new OrSet<String>("1", "2", "3", "4").value());
}
- private void assertThatOrSetDelIsMerged(final List<GossipService> clients){
+ private void assertThatOrSetDelIsMerged(final List<GossipManager> clients){
TUnit.assertThat(() -> {
- return clients.get(0).getGossipManager().findCrdt(orSetKey);
+ return clients.get(0).findCrdt(orSetKey);
}).afterWaitingAtMost(10, TimeUnit.SECONDS).equals(new OrSet<String>("1", "2", "4"));
}
- private void givenDifferentDatumsInSet(final List<GossipService> clients){
- clients.get(0).getGossipManager().merge(CrdtMessage("1"));
- clients.get(1).getGossipManager().merge(CrdtMessage("2"));
+ private void givenDifferentDatumsInSet(final List<GossipManager> clients){
+ clients.get(0).merge(CrdtMessage("1"));
+ clients.get(1).merge(CrdtMessage("2"));
}
- private void assertThatListIsMerged(final List<GossipService> clients){
+ private void assertThatListIsMerged(final List<GossipManager> clients){
TUnit.assertThat(() -> {
- return clients.get(0).getGossipManager().findCrdt("cr");
+ return clients.get(0).findCrdt("cr");
}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(new GrowOnlySet<String>(Arrays.asList("1","2")));
}
- private SharedGossipDataMessage CrdtMessage(String item){
- SharedGossipDataMessage d = new SharedGossipDataMessage();
+ private SharedDataMessage CrdtMessage(String item){
+ SharedDataMessage d = new SharedDataMessage();
d.setKey("cr");
d.setPayload(new GrowOnlySet<String>( Arrays.asList(item)));
d.setExpireAt(Long.MAX_VALUE);
@@ -167,8 +163,8 @@
return d;
}
- private GossipDataMessage msg(){
- GossipDataMessage g = new GossipDataMessage();
+ private PerNodeDataMessage msg(){
+ PerNodeDataMessage g = new PerNodeDataMessage();
g.setExpireAt(Long.MAX_VALUE);
g.setKey("a");
g.setPayload("b");
@@ -176,8 +172,8 @@
return g;
}
- private SharedGossipDataMessage sharedMsg(){
- SharedGossipDataMessage g = new SharedGossipDataMessage();
+ private SharedDataMessage sharedMsg(){
+ SharedDataMessage g = new SharedDataMessage();
g.setExpireAt(Long.MAX_VALUE);
g.setKey("a");
g.setPayload("c");
diff --git a/src/test/java/org/apache/gossip/IdAndPropertyTest.java b/src/test/java/org/apache/gossip/IdAndPropertyTest.java
index 1eb0aee..7c02d2d 100644
--- a/src/test/java/org/apache/gossip/IdAndPropertyTest.java
+++ b/src/test/java/org/apache/gossip/IdAndPropertyTest.java
@@ -28,43 +28,54 @@
import java.util.concurrent.TimeUnit;
import org.apache.gossip.manager.DatacenterRackAwareActiveGossiper;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.manager.GossipManagerBuilder;
import org.junit.jupiter.api.Test;
import org.junit.platform.runner.JUnitPlatform;
import org.junit.runner.RunWith;
-import com.codahale.metrics.MetricRegistry;
-
import io.teknek.tunit.TUnit;
@RunWith(JUnitPlatform.class)
-public class IdAndPropertyTest {
+public class IdAndPropertyTest extends AbstractIntegrationBase {
@Test
- public void testDatacenterRackGossiper() throws URISyntaxException, UnknownHostException, InterruptedException{
+ public void testDatacenterRackGossiper() throws URISyntaxException, UnknownHostException, InterruptedException {
GossipSettings settings = new GossipSettings();
settings.setActiveGossipClass(DatacenterRackAwareActiveGossiper.class.getName());
- List<GossipMember> startupMembers = new ArrayList<>();
+ List<Member> startupMembers = new ArrayList<>();
Map<String, String> x = new HashMap<>();
x.put("a", "b");
x.put("datacenter", "dc1");
x.put("rack", "rack1");
- GossipService gossipService1 = new GossipService("a", new URI("udp://" + "127.0.0.1" + ":" + (29000 + 0)), "0", x, startupMembers, settings,
- (a, b) -> {}, new MetricRegistry());
- gossipService1.start();
+ GossipManager gossipService1 = GossipManagerBuilder.newBuilder()
+ .cluster("a")
+ .uri(new URI("udp://" + "127.0.0.1" + ":" + (29000 + 0)))
+ .id("0")
+ .properties(x)
+ .gossipMembers(startupMembers)
+ .gossipSettings(settings).build();
+ gossipService1.init();
+ register(gossipService1);
Map<String, String> y = new HashMap<>();
y.put("a", "c");
y.put("datacenter", "dc2");
y.put("rack", "rack2");
- GossipService gossipService2 = new GossipService("a", new URI("udp://" + "127.0.0.1" + ":" + (29000 + 1)), "1", y,
- Arrays.asList(new RemoteGossipMember("a",
- new URI("udp://" + "127.0.0.1" + ":" + (29000 + 0)), "0")),
- settings, (a, b) -> { }, new MetricRegistry());
- gossipService2.start();
+ GossipManager gossipService2 = GossipManagerBuilder.newBuilder().cluster("a")
+ .uri( new URI("udp://" + "127.0.0.1" + ":" + (29000 + 1)))
+ .id("1")
+ .properties(y)
+ .gossipMembers(Arrays.asList(new RemoteMember("a",
+ new URI("udp://" + "127.0.0.1" + ":" + (29000 + 0)), "0")))
+ .gossipSettings(settings).build();
+ gossipService2.init();
+ register(gossipService2);
+
TUnit.assertThat(() -> {
String value = "";
try {
- value = gossipService1.getGossipManager().getLiveMembers().get(0).getProperties().get("a");
+ value = gossipService1.getLiveMembers().get(0).getProperties().get("a");
} catch (RuntimeException e){ }
return value;
}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo("c");
@@ -72,12 +83,9 @@
TUnit.assertThat(() -> {
String value = "";
try {
- value = gossipService2.getGossipManager().getLiveMembers().get(0).getProperties().get("a");
+ value = gossipService2.getLiveMembers().get(0).getProperties().get("a");
} catch (RuntimeException e){ }
return value;
- }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo("b");
- gossipService1.shutdown();
- gossipService2.shutdown();
-
+ }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo("b");
}
}
diff --git a/src/test/java/org/apache/gossip/GossipMemberTest.java b/src/test/java/org/apache/gossip/MemberTest.java
similarity index 80%
rename from src/test/java/org/apache/gossip/GossipMemberTest.java
rename to src/test/java/org/apache/gossip/MemberTest.java
index 272c7fb..5f0d18a 100644
--- a/src/test/java/org/apache/gossip/GossipMemberTest.java
+++ b/src/test/java/org/apache/gossip/MemberTest.java
@@ -27,14 +27,14 @@
import org.junit.runner.RunWith;
@RunWith(JUnitPlatform.class)
-public class GossipMemberTest {
+public class MemberTest {
@Test
public void testHashCodeFromGossip40() throws URISyntaxException {
Assert.assertNotEquals(
- new LocalGossipMember("mycluster", new URI("udp://4.4.4.4:1000"), "myid", 1, new HashMap<String,String>(), 10, 5, "exponential")
+ new LocalMember("mycluster", new URI("udp://4.4.4.4:1000"), "myid", 1, new HashMap<String,String>(), 10, 5, "exponential")
.hashCode(),
- new LocalGossipMember("mycluster", new URI("udp://4.4.4.5:1005"), "yourid", 11, new HashMap<String,String>(), 11, 6, "exponential")
+ new LocalMember("mycluster", new URI("udp://4.4.4.5:1005"), "yourid", 11, new HashMap<String,String>(), 11, 6, "exponential")
.hashCode());
}
}
diff --git a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
index 6a0765b..48fb2cb 100644
--- a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
+++ b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
@@ -17,7 +17,6 @@
*/
package org.apache.gossip;
-import com.codahale.metrics.MetricRegistry;
import io.teknek.tunit.TUnit;
import java.net.URI;
@@ -25,13 +24,14 @@
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.manager.GossipManagerBuilder;
import org.apache.log4j.Logger;
import org.junit.platform.runner.JUnitPlatform;
@@ -52,25 +52,31 @@
settings.setPersistDataState(false);
String cluster = UUID.randomUUID().toString();
int seedNodes = 3;
- List<GossipMember> startupMembers = new ArrayList<>();
+ List<Member> startupMembers = new ArrayList<>();
for (int i = 1; i < seedNodes + 1; ++i) {
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (30300 + i));
- startupMembers.add(new RemoteGossipMember(cluster, uri, i + ""));
+ startupMembers.add(new RemoteMember(cluster, uri, i + ""));
}
- final List<GossipService> clients = Collections.synchronizedList(new ArrayList<GossipService>());
+ final List<GossipManager> clients = Collections.synchronizedList(new ArrayList<GossipManager>());
final int clusterMembers = 5;
for (int i = 1; i < clusterMembers + 1; ++i) {
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (30300 + i));
- GossipService gossipService = new GossipService(cluster, uri, i + "", new HashMap<String,String>(), startupMembers,
- settings, (a,b) -> {}, new MetricRegistry());
+ GossipManager gossipService = GossipManagerBuilder.newBuilder()
+ .cluster(cluster)
+ .uri(uri)
+ .id(i + "")
+ .gossipMembers(startupMembers)
+ .gossipSettings(settings)
+ .build();
clients.add(gossipService);
- gossipService.start();
+ gossipService.init();
+
}
TUnit.assertThat(new Callable<Integer>() {
public Integer call() throws Exception {
int total = 0;
for (int i = 0; i < clusterMembers; ++i) {
- total += clients.get(i).getGossipManager().getLiveMembers().size();
+ total += clients.get(i).getLiveMembers().size();
}
return total;
}
@@ -79,15 +85,15 @@
Random r = new Random();
int randomClientId = r.nextInt(clusterMembers);
log.info("shutting down " + randomClientId);
- final int shutdownPort = clients.get(randomClientId).getGossipManager().getMyself().getUri()
+ final int shutdownPort = clients.get(randomClientId).getMyself().getUri()
.getPort();
- final String shutdownId = clients.get(randomClientId).getGossipManager().getMyself().getId();
+ final String shutdownId = clients.get(randomClientId).getMyself().getId();
clients.get(randomClientId).shutdown();
TUnit.assertThat(new Callable<Integer>() {
public Integer call() throws Exception {
int total = 0;
for (int i = 0; i < clusterMembers; ++i) {
- total += clients.get(i).getGossipManager().getLiveMembers().size();
+ total += clients.get(i).getLiveMembers().size();
}
return total;
}
@@ -98,7 +104,7 @@
public Integer call() throws Exception {
int total = 0;
for (int i = 0; i < clusterMembers - 1; ++i) {
- total += clients.get(i).getGossipManager().getDeadMembers().size();
+ total += clients.get(i).getDeadMembers().size();
}
return total;
}
@@ -106,17 +112,22 @@
URI uri = new URI("udp://" + "127.0.0.1" + ":" + shutdownPort);
// start client again
- GossipService gossipService = new GossipService(cluster, uri, shutdownId + "", new HashMap<String,String>(), startupMembers,
- settings, (a,b) -> {}, new MetricRegistry());
+ GossipManager gossipService = GossipManagerBuilder.newBuilder()
+ .gossipSettings(settings)
+ .cluster(cluster)
+ .uri(uri)
+ .id(shutdownId+"")
+ .gossipMembers(startupMembers)
+ .build();
clients.add(gossipService);
- gossipService.start();
+ gossipService.init();
// verify that the client is alive again for every node
TUnit.assertThat(new Callable<Integer>() {
public Integer call() throws Exception {
int total = 0;
for (int i = 0; i < clusterMembers; ++i) {
- total += clients.get(i).getGossipManager().getLiveMembers().size();
+ total += clients.get(i).getLiveMembers().size();
}
return total;
}
diff --git a/src/test/java/org/apache/gossip/SignedMessageTest.java b/src/test/java/org/apache/gossip/SignedMessageTest.java
index 6bea974..50e3cb5 100644
--- a/src/test/java/org/apache/gossip/SignedMessageTest.java
+++ b/src/test/java/org/apache/gossip/SignedMessageTest.java
@@ -25,33 +25,36 @@
import java.security.NoSuchAlgorithmException;
import java.security.NoSuchProviderException;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.manager.GossipManagerBuilder;
import org.apache.gossip.manager.PassiveGossipConstants;
import org.apache.gossip.secure.KeyTool;
import org.junit.Assert;
import org.junit.Test;
-import com.codahale.metrics.MetricRegistry;
-
import io.teknek.tunit.TUnit;
-public class SignedMessageTest {
+public class SignedMessageTest extends AbstractIntegrationBase {
- @Test(expected=IllegalArgumentException.class)
+ @Test(expected = IllegalArgumentException.class)
public void ifSignMustHaveKeys()
throws URISyntaxException, UnknownHostException, InterruptedException {
String cluster = UUID.randomUUID().toString();
GossipSettings settings = gossiperThatSigns();
- List<GossipMember> startupMembers = new ArrayList<>();
+ List<Member> startupMembers = new ArrayList<>();
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (30000 + 1));
- GossipService gossipService = new GossipService(cluster, uri, 1 + "",
- new HashMap<String, String>(), startupMembers, settings, (a, b) -> { },
- new MetricRegistry());
- gossipService.start();
+ GossipManager gossipService = GossipManagerBuilder.newBuilder()
+ .cluster(cluster)
+ .uri(uri)
+ .id(1 + "")
+ .gossipMembers(startupMembers)
+ .gossipSettings(settings)
+ .build();
+ gossipService.init();
}
private GossipSettings gossiperThatSigns(){
@@ -63,48 +66,52 @@
}
@Test
- public void dataTest() throws InterruptedException, URISyntaxException, NoSuchAlgorithmException, NoSuchProviderException, IOException{
+ public void dataTest() throws InterruptedException, URISyntaxException, NoSuchAlgorithmException, NoSuchProviderException, IOException {
String keys = "./keys";
GossipSettings settings = gossiperThatSigns();
setup(keys);
String cluster = UUID.randomUUID().toString();
- List<GossipMember> startupMembers = new ArrayList<>();
+ List<Member> startupMembers = new ArrayList<>();
for (int i = 1; i < 2; ++i) {
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (30000 + i));
- startupMembers.add(new RemoteGossipMember(cluster, uri, i + ""));
+ startupMembers.add(new RemoteMember(cluster, uri, i + ""));
}
- final List<GossipService> clients = new ArrayList<>();
+ final List<GossipManager> clients = new ArrayList<>();
for (int i = 1; i < 3; ++i) {
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (30000 + i));
- GossipService gossipService = new GossipService(cluster, uri, i + "",
- new HashMap<String,String>(), startupMembers, settings,
- (a,b) -> {}, new MetricRegistry());
+ GossipManager gossipService = GossipManagerBuilder.newBuilder()
+ .cluster(cluster)
+ .uri(uri)
+ .id(i + "")
+ .gossipMembers(startupMembers)
+ .gossipSettings(settings)
+ .build();
+ gossipService.init();
clients.add(gossipService);
- gossipService.start();
}
assertTwoAlive(clients);
assertOnlySignedMessages(clients);
cleanup(keys, clients);
}
- private void assertTwoAlive(List<GossipService> clients){
+ private void assertTwoAlive(List<GossipManager> clients){
TUnit.assertThat(() -> {
int total = 0;
for (int i = 0; i < clients.size(); ++i) {
- total += clients.get(i).getGossipManager().getLiveMembers().size();
+ total += clients.get(i).getLiveMembers().size();
}
return total;
}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2);
}
- private void assertOnlySignedMessages(List<GossipService> clients){
- Assert.assertEquals(0, clients.get(0).getGossipManager().getRegistry()
+ private void assertOnlySignedMessages(List<GossipManager> clients){
+ Assert.assertEquals(0, clients.get(0).getRegistry()
.meter(PassiveGossipConstants.UNSIGNED_MESSAGE).getCount());
- Assert.assertTrue(clients.get(0).getGossipManager().getRegistry()
+ Assert.assertTrue(clients.get(0).getRegistry()
.meter(PassiveGossipConstants.SIGNED_MESSAGE).getCount() > 0);
}
- private void cleanup(String keys, List<GossipService> clients){
+ private void cleanup(String keys, List<GossipManager> clients){
new File(keys, "1").delete();
new File(keys, "2").delete();
new File(keys).delete();
diff --git a/src/test/java/org/apache/gossip/StartupSettingsTest.java b/src/test/java/org/apache/gossip/StartupSettingsTest.java
index aa4d255..d6c4a1e 100644
--- a/src/test/java/org/apache/gossip/StartupSettingsTest.java
+++ b/src/test/java/org/apache/gossip/StartupSettingsTest.java
@@ -17,7 +17,8 @@
*/
package org.apache.gossip;
-import com.codahale.metrics.MetricRegistry;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.manager.GossipManagerBuilder;
import org.apache.log4j.Logger;
import org.junit.jupiter.api.Test;
@@ -27,8 +28,6 @@
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.HashMap;
import java.util.UUID;
import org.junit.platform.runner.JUnitPlatform;
import org.junit.runner.RunWith;
@@ -39,7 +38,7 @@
*/
@RunWith(JUnitPlatform.class)
public class StartupSettingsTest {
- private static final Logger log = Logger.getLogger( StartupSettingsTest.class );
+ private static final Logger log = Logger.getLogger(StartupSettingsTest.class);
private static final String CLUSTER = UUID.randomUUID().toString();
@Test
@@ -48,15 +47,17 @@
settingsFile.deleteOnExit();
writeSettingsFile(settingsFile);
URI uri = new URI("udp://" + "127.0.0.1" + ":" + 50000);
- final GossipService firstService = new GossipService(
- CLUSTER, uri, "1", new HashMap<String, String>(),
- new ArrayList<GossipMember>(), new GossipSettings(), null, new MetricRegistry());
- firstService.start();
- final GossipService serviceUnderTest = new GossipService(
- StartupSettings.fromJSONFile(settingsFile));
- serviceUnderTest.start();
+ GossipManager firstService = GossipManagerBuilder.newBuilder()
+ .cluster(CLUSTER)
+ .uri(uri)
+ .id("1")
+ .gossipSettings(new GossipSettings()).build();
+ firstService.init();
+ GossipManager manager = GossipManagerBuilder.newBuilder()
+ .startupSettings(StartupSettings.fromJSONFile(settingsFile)).build();
+ manager.init();
firstService.shutdown();
- serviceUnderTest.shutdown();
+ manager.shutdown();
}
private void writeSettingsFile( File target ) throws IOException {
diff --git a/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java b/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
index 8a9a9ab..8ae783e 100644
--- a/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
+++ b/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
@@ -17,20 +17,20 @@
*/
package org.apache.gossip;
-import com.codahale.metrics.MetricRegistry;
import io.teknek.tunit.TUnit;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.junit.platform.runner.JUnitPlatform;
import org.junit.runner.RunWith;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.manager.GossipManagerBuilder;
import org.junit.jupiter.api.Test;
@RunWith(JUnitPlatform.class)
@@ -42,35 +42,40 @@
}
@Test
- public void testAgain() throws UnknownHostException, InterruptedException, URISyntaxException{
+ public void testAgain() throws UnknownHostException, InterruptedException, URISyntaxException {
abc(30100);
}
- public void abc(int base) throws InterruptedException, UnknownHostException, URISyntaxException{
- GossipSettings settings = new GossipSettings(1000, 10000, 1000, 1, 2.0, "exponential");
+ public void abc(int base) throws InterruptedException, UnknownHostException, URISyntaxException {
+ GossipSettings settings = new GossipSettings(1000, 10000, 1000, 1, 1.6, "exponential");
settings.setPersistRingState(false);
settings.setPersistDataState(false);
String cluster = UUID.randomUUID().toString();
int seedNodes = 3;
- List<GossipMember> startupMembers = new ArrayList<>();
+ List<Member> startupMembers = new ArrayList<>();
for (int i = 1; i < seedNodes+1; ++i) {
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (base + i));
- startupMembers.add(new RemoteGossipMember(cluster, uri, i + ""));
+ startupMembers.add(new RemoteMember(cluster, uri, i + ""));
}
- final List<GossipService> clients = new ArrayList<>();
+ final List<GossipManager> clients = new ArrayList<>();
final int clusterMembers = 5;
for (int i = 1; i < clusterMembers+1; ++i) {
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (base + i));
- GossipService gossipService = new GossipService(cluster, uri, i + "", new HashMap<String,String>(),
- startupMembers, settings, (a,b) -> {}, new MetricRegistry());
+ GossipManager gossipService = GossipManagerBuilder.newBuilder()
+ .cluster(cluster)
+ .uri(uri)
+ .id(i + "")
+ .gossipSettings(settings)
+ .gossipMembers(startupMembers)
+ .build();
+ gossipService.init();
clients.add(gossipService);
- gossipService.start();
}
TUnit.assertThat(new Callable<Integer> (){
public Integer call() throws Exception {
int total = 0;
for (int i = 0; i < clusterMembers; ++i) {
- total += clients.get(i).getGossipManager().getLiveMembers().size();
+ total += clients.get(i).getLiveMembers().size();
}
return total;
}}).afterWaitingAtMost(40, TimeUnit.SECONDS).isEqualTo(20);
diff --git a/src/test/java/org/apache/gossip/accrual/FailureDetectorTest.java b/src/test/java/org/apache/gossip/accrual/FailureDetectorTest.java
index 69d46b8..b00c0c3 100644
--- a/src/test/java/org/apache/gossip/accrual/FailureDetectorTest.java
+++ b/src/test/java/org/apache/gossip/accrual/FailureDetectorTest.java
@@ -19,7 +19,7 @@
import java.net.URI;
-import org.apache.gossip.LocalGossipMember;
+import org.apache.gossip.LocalMember;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.jupiter.api.Test;
@@ -33,7 +33,7 @@
public void aNormalTest(){
int samples = 1;
int windowSize = 1000;
- LocalGossipMember member = new LocalGossipMember("", URI.create("udp://127.0.0.1:1000"),
+ LocalMember member = new LocalMember("", URI.create("udp://127.0.0.1:1000"),
"", 0L, null, windowSize, samples, "normal");
member.recordHeartbeat(5);
member.recordHeartbeat(10);
@@ -44,7 +44,7 @@
public void aTest(){
int samples = 1;
int windowSize = 1000;
- LocalGossipMember member = new LocalGossipMember("", URI.create("udp://127.0.0.1:1000"),
+ LocalMember member = new LocalMember("", URI.create("udp://127.0.0.1:1000"),
"", 0L, null, windowSize, samples, "exponential");
member.recordHeartbeat(5);
member.recordHeartbeat(10);
@@ -64,7 +64,7 @@
public void sameHeartbeatTest(){
int samples = 1;
int windowSize = 1000;
- LocalGossipMember member = new LocalGossipMember("", URI.create("udp://127.0.0.1:1000"),
+ LocalMember member = new LocalMember("", URI.create("udp://127.0.0.1:1000"),
"", 0L, null, windowSize, samples, "exponential");
member.recordHeartbeat(5);
member.recordHeartbeat(5);
diff --git a/src/test/java/org/apache/gossip/crdt/OrSetTest.java b/src/test/java/org/apache/gossip/crdt/OrSetTest.java
index e576764..b3d8af3 100644
--- a/src/test/java/org/apache/gossip/crdt/OrSetTest.java
+++ b/src/test/java/org/apache/gossip/crdt/OrSetTest.java
@@ -21,18 +21,15 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.SortedSet;
import java.util.TreeSet;
-import org.apache.gossip.GossipService;
import org.apache.gossip.GossipSettings;
-import org.apache.gossip.RemoteGossipMember;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.manager.GossipManagerBuilder;
import org.junit.Assert;
import org.junit.Test;
-import com.codahale.metrics.MetricRegistry;
-
public class OrSetTest {
@Test
@@ -91,14 +88,16 @@
@Test
public void serialTest() throws InterruptedException, URISyntaxException, IOException {
- GossipService gossipService2 = new GossipService("a", new URI("udp://" + "127.0.0.1" + ":" + (29000 + 1)), "1", new HashMap<>(),
- Arrays.asList(new RemoteGossipMember("a",
- new URI("udp://" + "127.0.0.1" + ":" + (29000 + 0)), "0")),
- new GossipSettings(), (a, b) -> { }, new MetricRegistry());
+ GossipManager gossipService2 = GossipManagerBuilder.newBuilder()
+ .cluster("a")
+ .uri(new URI("udp://" + "127.0.0.1" + ":" + (29000 + 1)))
+ .id("1")
+ .gossipSettings(new GossipSettings())
+ .build();
OrSet<Integer> i = new OrSet<Integer>(new OrSet.Builder<Integer>().add(1).remove(1));
- String s = gossipService2.getGossipManager().getObjectMapper().writeValueAsString(i);
+ String s = gossipService2.getObjectMapper().writeValueAsString(i);
@SuppressWarnings("unchecked")
- OrSet<Integer> back = gossipService2.getGossipManager().getObjectMapper().readValue(s, OrSet.class);
+ OrSet<Integer> back = gossipService2.getObjectMapper().readValue(s, OrSet.class);
Assert.assertEquals(back, i);
}
diff --git a/src/test/java/org/apache/gossip/manager/DataReaperTest.java b/src/test/java/org/apache/gossip/manager/DataReaperTest.java
index a9c861c..e328c24 100644
--- a/src/test/java/org/apache/gossip/manager/DataReaperTest.java
+++ b/src/test/java/org/apache/gossip/manager/DataReaperTest.java
@@ -21,9 +21,8 @@
import java.net.URI;
import org.apache.gossip.GossipSettings;
-import org.apache.gossip.manager.random.RandomGossipManager;
-import org.apache.gossip.model.GossipDataMessage;
-import org.apache.gossip.model.SharedGossipDataMessage;
+import org.apache.gossip.model.PerNodeDataMessage;
+import org.apache.gossip.model.SharedDataMessage;
import org.junit.Assert;
import org.junit.Test;
@@ -41,8 +40,8 @@
GossipSettings settings = new GossipSettings();
settings.setPersistRingState(false);
settings.setPersistDataState(false);
- GossipManager gm = RandomGossipManager.newBuilder().cluster("abc").settings(settings)
- .withId(myId).uri(URI.create("udp://localhost:6000")).registry(registry).build();
+ GossipManager gm = GossipManagerBuilder.newBuilder().cluster("abc").gossipSettings(settings)
+ .id(myId).uri(URI.create("udp://localhost:6000")).registry(registry).build();
gm.init();
gm.gossipPerNodeData(perNodeDatum(key, value));
gm.gossipSharedData(sharedDatum(key, value));
@@ -65,8 +64,8 @@
TUnit.assertThat(() -> gm.findSharedGossipData(key)).equals(null);
}
- private GossipDataMessage perNodeDatum(String key, String value) {
- GossipDataMessage m = new GossipDataMessage();
+ private PerNodeDataMessage perNodeDatum(String key, String value) {
+ PerNodeDataMessage m = new PerNodeDataMessage();
m.setExpireAt(System.currentTimeMillis() + 5L);
m.setKey(key);
m.setPayload(value);
@@ -74,8 +73,8 @@
return m;
}
- private SharedGossipDataMessage sharedDatum(String key, String value) {
- SharedGossipDataMessage m = new SharedGossipDataMessage();
+ private SharedDataMessage sharedDatum(String key, String value) {
+ SharedDataMessage m = new SharedDataMessage();
m.setExpireAt(System.currentTimeMillis() + 5L);
m.setKey(key);
m.setPayload(value);
@@ -89,11 +88,11 @@
String key = "key";
String value = "a";
GossipSettings settings = new GossipSettings();
- GossipManager gm = RandomGossipManager.newBuilder().cluster("abc").settings(settings)
- .withId(myId).uri(URI.create("udp://localhost:7000")).registry(registry).build();
+ GossipManager gm = GossipManagerBuilder.newBuilder().cluster("abc").gossipSettings(settings)
+ .id(myId).uri(URI.create("udp://localhost:7000")).registry(registry).build();
gm.init();
- GossipDataMessage before = perNodeDatum(key, value);
- GossipDataMessage after = perNodeDatum(key, "b");
+ PerNodeDataMessage before = perNodeDatum(key, value);
+ PerNodeDataMessage after = perNodeDatum(key, "b");
after.setTimestamp(after.getTimestamp() - 1);
gm.gossipPerNodeData(before);
Assert.assertEquals(value, gm.findPerNodeGossipData(myId, key).getPayload());
diff --git a/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java b/src/test/java/org/apache/gossip/manager/GossipManagerBuilderTest.java
similarity index 77%
rename from src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java
rename to src/test/java/org/apache/gossip/manager/GossipManagerBuilderTest.java
index 2d04087..8842643 100644
--- a/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java
+++ b/src/test/java/org/apache/gossip/manager/GossipManagerBuilderTest.java
@@ -18,14 +18,13 @@
package org.apache.gossip.manager;
import com.codahale.metrics.MetricRegistry;
-import org.apache.gossip.GossipMember;
+import org.apache.gossip.Member;
import org.apache.gossip.GossipSettings;
-import org.apache.gossip.LocalGossipMember;
+import org.apache.gossip.LocalMember;
import org.apache.gossip.manager.handlers.DefaultMessageInvoker;
import org.apache.gossip.manager.handlers.MessageInvoker;
import org.apache.gossip.manager.handlers.ResponseHandler;
import org.apache.gossip.manager.handlers.SimpleMessageInvoker;
-import org.apache.gossip.manager.random.RandomGossipManager;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
import org.junit.platform.runner.JUnitPlatform;
@@ -43,47 +42,47 @@
import static org.junit.jupiter.api.Assertions.expectThrows;
@RunWith(JUnitPlatform.class)
-public class RandomGossipManagerBuilderTest {
+public class GossipManagerBuilderTest {
@Test
public void idShouldNotBeNull() {
expectThrows(IllegalArgumentException.class,() -> {
- RandomGossipManager.newBuilder().cluster("aCluster").build();
+ GossipManagerBuilder.newBuilder().cluster("aCluster").build();
});
}
@Test
public void clusterShouldNotBeNull() {
expectThrows(IllegalArgumentException.class,() -> {
- RandomGossipManager.newBuilder().withId("id").build();
+ GossipManagerBuilder.newBuilder().id("id").build();
});
}
@Test
public void settingsShouldNotBeNull() {
expectThrows(IllegalArgumentException.class,() -> {
- RandomGossipManager.newBuilder().withId("id").cluster("aCluster").build();
+ GossipManagerBuilder.newBuilder().id("id").cluster("aCluster").build();
});
}
@Test
public void createMembersListIfNull() throws URISyntaxException {
- RandomGossipManager gossipManager = RandomGossipManager.newBuilder()
- .withId("id")
+ GossipManager gossipManager = GossipManagerBuilder.newBuilder()
+ .id("id")
.cluster("aCluster")
.uri(new URI("udp://localhost:2000"))
- .settings(new GossipSettings())
+ .gossipSettings(new GossipSettings())
.gossipMembers(null).registry(new MetricRegistry()).build();
assertNotNull(gossipManager.getLiveMembers());
}
@Test
public void createDefaultMessageInvokerIfNull() throws URISyntaxException {
- RandomGossipManager gossipManager = RandomGossipManager.newBuilder()
- .withId("id")
+ GossipManager gossipManager = GossipManagerBuilder.newBuilder()
+ .id("id")
.cluster("aCluster")
.uri(new URI("udp://localhost:2000"))
- .settings(new GossipSettings())
+ .gossipSettings(new GossipSettings())
.messageInvoker(null).registry(new MetricRegistry()).build();
assertNotNull(gossipManager.getMessageInvoker());
Assert.assertEquals(gossipManager.getMessageInvoker().getClass(), new DefaultMessageInvoker().getClass());
@@ -92,11 +91,11 @@
@Test
public void testMessageInvokerKeeping() throws URISyntaxException {
MessageInvoker mi = new SimpleMessageInvoker(Response.class, new ResponseHandler());
- RandomGossipManager gossipManager = RandomGossipManager.newBuilder()
- .withId("id")
+ GossipManager gossipManager = GossipManagerBuilder.newBuilder()
+ .id("id")
.cluster("aCluster")
.uri(new URI("udp://localhost:2000"))
- .settings(new GossipSettings())
+ .gossipSettings(new GossipSettings())
.messageInvoker(mi).registry(new MetricRegistry()).build();
assertNotNull(gossipManager.getMessageInvoker());
Assert.assertEquals(gossipManager.getMessageInvoker(), mi);
@@ -104,15 +103,15 @@
@Test
public void useMemberListIfProvided() throws URISyntaxException {
- LocalGossipMember member = new LocalGossipMember(
+ LocalMember member = new LocalMember(
"aCluster", new URI("udp://localhost:2000"), "aGossipMember",
System.nanoTime(), new HashMap<String, String>(), 1000, 1, "exponential");
- List<GossipMember> memberList = new ArrayList<>();
+ List<Member> memberList = new ArrayList<>();
memberList.add(member);
- RandomGossipManager gossipManager = RandomGossipManager.newBuilder()
- .withId("id")
+ GossipManager gossipManager = GossipManagerBuilder.newBuilder()
+ .id("id")
.cluster("aCluster")
- .settings(new GossipSettings())
+ .gossipSettings(new GossipSettings())
.uri(new URI("udp://localhost:8000"))
.gossipMembers(memberList).registry(new MetricRegistry()).build();
assertEquals(1, gossipManager.getDeadMembers().size());
diff --git a/src/test/java/org/apache/gossip/manager/RingPersistenceTest.java b/src/test/java/org/apache/gossip/manager/RingPersistenceTest.java
index 6e41bdc..d448b98 100644
--- a/src/test/java/org/apache/gossip/manager/RingPersistenceTest.java
+++ b/src/test/java/org/apache/gossip/manager/RingPersistenceTest.java
@@ -22,15 +22,11 @@
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.Arrays;
-import java.util.HashMap;
-import org.apache.gossip.GossipService;
import org.apache.gossip.GossipSettings;
-import org.apache.gossip.RemoteGossipMember;
+import org.apache.gossip.RemoteMember;
import org.junit.Assert;
import org.junit.Test;
-import com.codahale.metrics.MetricRegistry;
-
public class RingPersistenceTest {
@Test
@@ -43,21 +39,26 @@
}
private File aGossiperPersists(GossipSettings settings) throws UnknownHostException, InterruptedException, URISyntaxException {
- GossipService gossipService = new GossipService("a", new URI("udp://" + "127.0.0.1" + ":" + (29000 + 1)), "1", new HashMap<String, String>(),
- Arrays.asList(
- new RemoteGossipMember("a", new URI("udp://" + "127.0.0.1" + ":" + (29000 + 0)), "0"),
- new RemoteGossipMember("a", new URI("udp://" + "127.0.0.1" + ":" + (29000 + 2)), "2")
- ),
- settings, (a, b) -> { }, new MetricRegistry());
- gossipService.getGossipManager().getRingState().writeToDisk();
- return gossipService.getGossipManager().getRingState().computeTarget();
+ GossipManager gossipService = GossipManagerBuilder.newBuilder()
+ .cluster("a")
+ .uri(new URI("udp://" + "127.0.0.1" + ":" + (29000 + 1)))
+ .id("1")
+ .gossipSettings(settings)
+ .gossipMembers(
+ Arrays.asList(
+ new RemoteMember("a", new URI("udp://" + "127.0.0.1" + ":" + (29000 + 0)), "0"),
+ new RemoteMember("a", new URI("udp://" + "127.0.0.1" + ":" + (29000 + 2)), "2"))).build();
+ gossipService.getRingState().writeToDisk();
+ return gossipService.getRingState().computeTarget();
}
- private void aNewInstanceGetsRingInfo(GossipSettings settings) throws UnknownHostException, InterruptedException, URISyntaxException{
- GossipService gossipService2 = new GossipService("a", new URI("udp://" + "127.0.0.1" + ":" + (29000 + 1)), "1", new HashMap<String, String>(),
- Arrays.asList(),
- settings, (a, b) -> { }, new MetricRegistry());
- Assert.assertEquals(2, gossipService2.getGossipManager().getMembers().size());
+ private void aNewInstanceGetsRingInfo(GossipSettings settings) throws UnknownHostException, InterruptedException, URISyntaxException {
+ GossipManager gossipService2 = GossipManagerBuilder.newBuilder()
+ .cluster("a")
+ .uri(new URI("udp://" + "127.0.0.1" + ":" + (29000 + 1)))
+ .id("1")
+ .gossipSettings(settings).build();
+ Assert.assertEquals(2, gossipService2.getMembers().size());
}
}
diff --git a/src/test/java/org/apache/gossip/manager/UserDataPersistenceTest.java b/src/test/java/org/apache/gossip/manager/UserDataPersistenceTest.java
index e0cbcf4..7b17e41 100644
--- a/src/test/java/org/apache/gossip/manager/UserDataPersistenceTest.java
+++ b/src/test/java/org/apache/gossip/manager/UserDataPersistenceTest.java
@@ -21,64 +21,65 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
-import java.util.Arrays;
-import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
-import org.apache.gossip.GossipService;
import org.apache.gossip.GossipSettings;
-import org.apache.gossip.model.GossipDataMessage;
-import org.apache.gossip.model.SharedGossipDataMessage;
+import org.apache.gossip.model.PerNodeDataMessage;
+import org.apache.gossip.model.SharedDataMessage;
import org.junit.Assert;
import org.junit.Test;
-import com.codahale.metrics.MetricRegistry;
-
public class UserDataPersistenceTest {
+ String nodeId = "1";
+
+ private GossipManager sameService() throws URISyntaxException {
+ GossipSettings settings = new GossipSettings();
+ return GossipManagerBuilder.newBuilder()
+ .cluster("a")
+ .uri(new URI("udp://" + "127.0.0.1" + ":" + (29000 + 1)))
+ .id(nodeId)
+ .gossipSettings(settings).build();
+ }
+
@Test
public void givenThatRingIsPersisted() throws UnknownHostException, InterruptedException, URISyntaxException {
- String nodeId = "1";
- GossipSettings settings = new GossipSettings();
+
{ //Create a gossip service and force it to persist its user data
- GossipService gossipService = new GossipService("a",
- new URI("udp://" + "127.0.0.1" + ":" + (29000 + 1)), nodeId, new HashMap<String, String>(),
- Arrays.asList(), settings, (a, b) -> { }, new MetricRegistry());
- gossipService.start();
+ GossipManager gossipService = sameService();
+ gossipService.init();
gossipService.gossipPerNodeData(getToothpick());
gossipService.gossipSharedData(getAnotherToothpick());
- gossipService.getGossipManager().getUserDataState().writePerNodeToDisk();
- gossipService.getGossipManager().getUserDataState().writeSharedToDisk();
+ gossipService.getUserDataState().writePerNodeToDisk();
+ gossipService.getUserDataState().writeSharedToDisk();
{ //read the raw data and confirm
- ConcurrentHashMap<String, ConcurrentHashMap<String, GossipDataMessage>> l = gossipService.getGossipManager().getUserDataState().readPerNodeFromDisk();
+ ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>> l = gossipService.getUserDataState().readPerNodeFromDisk();
Assert.assertEquals("red", ((AToothpick) l.get(nodeId).get("a").getPayload()).getColor());
}
{
- ConcurrentHashMap<String, SharedGossipDataMessage> l =
- gossipService.getGossipManager().getUserDataState().readSharedDataFromDisk();
+ ConcurrentHashMap<String, SharedDataMessage> l =
+ gossipService.getUserDataState().readSharedDataFromDisk();
Assert.assertEquals("blue", ((AToothpick) l.get("a").getPayload()).getColor());
}
gossipService.shutdown();
}
{ //recreate the service and see that the data is read back in
- GossipService gossipService = new GossipService("a",
- new URI("udp://" + "127.0.0.1" + ":" + (29000 + 1)), nodeId, new HashMap<String, String>(),
- Arrays.asList(), settings, (a, b) -> { }, new MetricRegistry());
- gossipService.start();
- Assert.assertEquals("red", ((AToothpick) gossipService.findPerNodeData(nodeId, "a").getPayload()).getColor());
- Assert.assertEquals("blue", ((AToothpick) gossipService.findSharedData("a").getPayload()).getColor());
- File f = gossipService.getGossipManager().getUserDataState().computeSharedTarget();
- File g = gossipService.getGossipManager().getUserDataState().computePerNodeTarget();
+ GossipManager gossipService = sameService();
+ gossipService.init();
+ Assert.assertEquals("red", ((AToothpick) gossipService.findPerNodeGossipData(nodeId, "a").getPayload()).getColor());
+ Assert.assertEquals("blue", ((AToothpick) gossipService.findSharedGossipData("a").getPayload()).getColor());
+ File f = gossipService.getUserDataState().computeSharedTarget();
+ File g = gossipService.getUserDataState().computePerNodeTarget();
gossipService.shutdown();
f.delete();
g.delete();
}
}
- public GossipDataMessage getToothpick(){
+ public PerNodeDataMessage getToothpick(){
AToothpick a = new AToothpick();
a.setColor("red");
- GossipDataMessage d = new GossipDataMessage();
+ PerNodeDataMessage d = new PerNodeDataMessage();
d.setExpireAt(Long.MAX_VALUE);
d.setKey("a");
d.setPayload(a);
@@ -86,10 +87,10 @@
return d;
}
- public SharedGossipDataMessage getAnotherToothpick(){
+ public SharedDataMessage getAnotherToothpick(){
AToothpick a = new AToothpick();
a.setColor("blue");
- SharedGossipDataMessage d = new SharedGossipDataMessage();
+ SharedDataMessage d = new SharedDataMessage();
d.setExpireAt(Long.MAX_VALUE);
d.setKey("a");
d.setPayload(a);
diff --git a/src/test/java/org/apache/gossip/manager/handlers/MessageInvokerTest.java b/src/test/java/org/apache/gossip/manager/handlers/MessageInvokerTest.java
index d402d59..571d7ba 100644
--- a/src/test/java/org/apache/gossip/manager/handlers/MessageInvokerTest.java
+++ b/src/test/java/org/apache/gossip/manager/handlers/MessageInvokerTest.java
@@ -21,7 +21,7 @@
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.model.ActiveGossipMessage;
import org.apache.gossip.model.Base;
-import org.apache.gossip.udp.UdpSharedGossipDataMessage;
+import org.apache.gossip.udp.UdpSharedDataMessage;
import org.junit.Assert;
import org.junit.Test;
@@ -166,7 +166,7 @@
//UdpSharedGossipDataMessage with null gossipCore -> exception
boolean thrown = false;
try {
- mi.invoke(null, null, new UdpSharedGossipDataMessage());
+ mi.invoke(null, null, new UdpSharedDataMessage());
} catch (NullPointerException e) {
thrown = true;
}