GOSSIP-36 Persist ring state
diff --git a/src/main/java/org/apache/gossip/GossipMember.java b/src/main/java/org/apache/gossip/GossipMember.java
index f2834bd..703ac55 100644
--- a/src/main/java/org/apache/gossip/GossipMember.java
+++ b/src/main/java/org/apache/gossip/GossipMember.java
@@ -28,11 +28,11 @@
public abstract class GossipMember implements Comparable<GossipMember> {
- protected final URI uri;
+ protected URI uri;
protected volatile long heartbeat;
- protected final String clusterName;
+ protected String clusterName;
/**
* The purpose of the id field is to be able for nodes to identify themselves beyond their
@@ -64,6 +64,7 @@
this.properties = properties;
}
+ protected GossipMember(){}
/**
* Get the name of the cluster the member belongs to.
*
@@ -78,7 +79,7 @@
* @return The member address in the form IP/host:port Similar to the toString in
* {@link InetSocketAddress}
*/
- public String getAddress() {
+ public String computeAddress() {
return uri.getHost() + ":" + uri.getPort();
}
@@ -118,7 +119,7 @@
}
public String toString() {
- return "Member [address=" + getAddress() + ", id=" + id + ", heartbeat=" + heartbeat + "]";
+ return "Member [address=" + computeAddress() + ", id=" + id + ", heartbeat=" + heartbeat + "]";
}
/**
@@ -128,7 +129,7 @@
public int hashCode() {
final int prime = 31;
int result = 1;
- String address = getAddress();
+ String address = computeAddress();
result = prime * result + ((address == null) ? 0 : address.hashCode()) + (clusterName == null ? 0
: clusterName.hashCode());
return result;
@@ -155,11 +156,11 @@
return false;
}
// The object is the same of they both have the same address (hostname and port).
- return getAddress().equals(((LocalGossipMember) obj).getAddress())
+ return computeAddress().equals(((LocalGossipMember) obj).computeAddress())
&& getClusterName().equals(((LocalGossipMember) obj).getClusterName());
}
public int compareTo(GossipMember other) {
- return this.getAddress().compareTo(other.getAddress());
+ return this.computeAddress().compareTo(other.computeAddress());
}
}
diff --git a/src/main/java/org/apache/gossip/GossipSettings.java b/src/main/java/org/apache/gossip/GossipSettings.java
index 1fed914..60a443f 100644
--- a/src/main/java/org/apache/gossip/GossipSettings.java
+++ b/src/main/java/org/apache/gossip/GossipSettings.java
@@ -40,7 +40,7 @@
/** the threshold for the detector */
//private double convictThreshold = 2.606201185901408;
- private double convictThreshold = 4.5;
+ private double convictThreshold = 2.606201185901408;
private String distribution = "exponential";
@@ -48,6 +48,14 @@
private Map<String,String> activeGossipProperties = new HashMap<>();
+ private String pathToRingState = "./";
+
+ private boolean persistRingState = true;
+
+ private String pathToDataState = "./";
+
+ private boolean persistDataState = true;
+
/**
* Construct GossipSettings with default settings.
*/
@@ -162,5 +170,37 @@
public void setActiveGossipProperties(Map<String, String> activeGossipProperties) {
this.activeGossipProperties = activeGossipProperties;
}
+
+ public String getPathToRingState() {
+ return pathToRingState;
+ }
+
+ public void setPathToRingState(String pathToRingState) {
+ this.pathToRingState = pathToRingState;
+ }
+
+ public boolean isPersistRingState() {
+ return persistRingState;
+ }
+
+ public void setPersistRingState(boolean persistRingState) {
+ this.persistRingState = persistRingState;
+ }
+
+ public String getPathToDataState() {
+ return pathToDataState;
+ }
+
+ public void setPathToDataState(String pathToDataState) {
+ this.pathToDataState = pathToDataState;
+ }
+
+ public boolean isPersistDataState() {
+ return persistDataState;
+ }
+
+ public void setPersistDataState(boolean persistDataState) {
+ this.persistDataState = persistDataState;
+ }
}
diff --git a/src/main/java/org/apache/gossip/LocalGossipMember.java b/src/main/java/org/apache/gossip/LocalGossipMember.java
index 557ffcb..05874f5 100644
--- a/src/main/java/org/apache/gossip/LocalGossipMember.java
+++ b/src/main/java/org/apache/gossip/LocalGossipMember.java
@@ -29,7 +29,7 @@
*/
public class LocalGossipMember extends GossipMember {
/** The failure detector for this member */
- private transient final FailureDetector detector;
+ private transient FailureDetector detector;
/**
*
@@ -46,6 +46,10 @@
detector = new FailureDetector(this, minSamples, windowSize, distribution);
}
+ protected LocalGossipMember(){
+
+ }
+
public void recordHeartbeat(long now){
detector.recordHeartbeat(now);
}
diff --git a/src/main/java/org/apache/gossip/StartupSettings.java b/src/main/java/org/apache/gossip/StartupSettings.java
index 0117be7..ab5f764 100644
--- a/src/main/java/org/apache/gossip/StartupSettings.java
+++ b/src/main/java/org/apache/gossip/StartupSettings.java
@@ -198,7 +198,7 @@
RemoteGossipMember member = new RemoteGossipMember(child.get("cluster").asText(),
uri3, "", 0, new HashMap<String,String>());
settings.addGossipMember(member);
- configMembersDetails += member.getAddress();
+ configMembersDetails += member.computeAddress();
configMembersDetails += ", ";
}
log.info(configMembersDetails + "]");
diff --git a/src/main/java/org/apache/gossip/examples/StandAloneDatacenterAndRack.java b/src/main/java/org/apache/gossip/examples/StandAloneDatacenterAndRack.java
new file mode 100644
index 0000000..dfeabd7
--- /dev/null
+++ b/src/main/java/org/apache/gossip/examples/StandAloneDatacenterAndRack.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gossip.examples;
+
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.gossip.GossipService;
+import org.apache.gossip.GossipSettings;
+import org.apache.gossip.RemoteGossipMember;
+import org.apache.gossip.manager.DatacenterRackAwareActiveGossiper;
+
+import com.codahale.metrics.MetricRegistry;
+
+public class StandAloneDatacenterAndRack {
+
+ public static void main (String [] args) throws UnknownHostException, InterruptedException {
+ GossipSettings s = new GossipSettings();
+ s.setWindowSize(10);
+ s.setConvictThreshold(1.0);
+ s.setGossipInterval(1000);
+ s.setActiveGossipClass(DatacenterRackAwareActiveGossiper.class.getName());
+ Map<String, String> gossipProps = new HashMap<>();
+ gossipProps.put("sameRackGossipIntervalMs", "2000");
+ gossipProps.put("differentDatacenterGossipIntervalMs", "10000");
+ s.setActiveGossipProperties(gossipProps);
+
+
+ Map<String, String> props = new HashMap<>();
+ props.put(DatacenterRackAwareActiveGossiper.DATACENTER, args[4]);
+ props.put(DatacenterRackAwareActiveGossiper.RACK, args[5]);
+ GossipService gossipService = new GossipService("mycluster", URI.create(args[0]), args[1],
+ props, Arrays.asList(new RemoteGossipMember("mycluster", URI.create(args[2]), args[3])),
+ s, (a, b) -> { }, new MetricRegistry());
+ gossipService.start();
+ while (true){
+ System.out.println("Live: " + gossipService.getGossipManager().getLiveMembers());
+ System.out.println("Dead: " + gossipService.getGossipManager().getDeadMembers());
+ Thread.sleep(2000);
+ }
+ }
+}
diff --git a/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java b/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java
index 40b9c28..4f5dfdc 100644
--- a/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java
+++ b/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java
@@ -180,7 +180,8 @@
}
private void sendToSameRackMember() {
- sendMembershipList(gossipManager.getMyself(), selectPartner(sameRackNodes()));
+ LocalGossipMember i = selectPartner(sameRackNodes());
+ sendMembershipList(gossipManager.getMyself(), i);
}
private void sendToSameRackMemberPerNode() {
diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java
index 840efb9..04afc28 100644
--- a/src/main/java/org/apache/gossip/manager/GossipManager.java
+++ b/src/main/java/org/apache/gossip/manager/GossipManager.java
@@ -54,35 +54,24 @@
public static final Logger LOGGER = Logger.getLogger(GossipManager.class);
private final ConcurrentSkipListMap<LocalGossipMember, GossipState> members;
-
private final LocalGossipMember me;
-
private final GossipSettings settings;
-
private final AtomicBoolean gossipServiceRunning;
-
private final GossipListener listener;
-
private AbstractActiveGossiper activeGossipThread;
-
private PassiveGossipThread passiveGossipThread;
-
private ExecutorService gossipThreadExecutor;
-
private final GossipCore gossipCore;
-
private final DataReaper dataReaper;
-
private final Clock clock;
-
private final ScheduledExecutorService scheduledServiced;
-
- private MetricRegistry registry;
-
+ private final MetricRegistry registry;
+ private final RingStatePersister ringState;
+ private final UserDataPersister userDataState;
+
public GossipManager(String cluster,
URI uri, String id, Map<String,String> properties, GossipSettings settings,
List<GossipMember> gossipMembers, GossipListener listener, MetricRegistry registry) {
-
this.settings = settings;
gossipCore = new GossipCore(this, registry);
clock = new SystemClock();
@@ -105,6 +94,10 @@
this.listener = listener;
this.scheduledServiced = Executors.newScheduledThreadPool(1);
this.registry = registry;
+ this.ringState = new RingStatePersister(this);
+ this.userDataState = new UserDataPersister(this, this.gossipCore);
+ readSavedRingState();
+ readSavedDataState();
}
public ConcurrentSkipListMap<LocalGossipMember, GossipState> getMembers() {
@@ -150,6 +143,7 @@
throw new RuntimeException(e);
}
}
+
/**
* Starts the client. Specifically, start the various cycles for this protocol. Start the gossip
* thread and start the receiver thread.
@@ -160,13 +154,14 @@
activeGossipThread = constructActiveGossiper();
activeGossipThread.init();
dataReaper.init();
+ scheduledServiced.scheduleAtFixedRate(ringState, 60, 60, TimeUnit.SECONDS);
+ scheduledServiced.scheduleAtFixedRate(userDataState, 60, 60, TimeUnit.SECONDS);
scheduledServiced.scheduleAtFixedRate(() -> {
try {
for (Entry<LocalGossipMember, GossipState> entry : members.entrySet()) {
Double result = null;
try {
result = entry.getKey().detect(clock.nanoTime());
- //System.out.println(entry.getKey() +" "+ result);
if (result != null) {
if (result > settings.getConvictThreshold() && entry.getValue() == GossipState.UP) {
members.put(entry.getKey(), GossipState.DOWN);
@@ -195,6 +190,27 @@
LOGGER.debug("The GossipManager is started.");
}
+ private void readSavedRingState() {
+ for (LocalGossipMember l : ringState.readFromDisk()){
+ LocalGossipMember member = new LocalGossipMember(l.getClusterName(),
+ l.getUri(), l.getId(),
+ clock.nanoTime(), l.getProperties(), settings.getWindowSize(),
+ settings.getMinimumSamples(), settings.getDistribution());
+ members.putIfAbsent(member, GossipState.DOWN);
+ }
+ }
+
+ private void readSavedDataState() {
+ for (Entry<String, ConcurrentHashMap<String, GossipDataMessage>> l : userDataState.readPerNodeFromDisk().entrySet()){
+ for (Entry<String, GossipDataMessage> j : l.getValue().entrySet()){
+ gossipCore.addPerNodeData(j.getValue());
+ }
+ }
+ for (Entry<String, SharedGossipDataMessage> l: userDataState.readSharedDataFromDisk().entrySet()){
+ gossipCore.addSharedData(l.getValue());
+ }
+ }
+
/**
* Shutdown the gossip service.
*/
@@ -217,6 +233,14 @@
} catch (InterruptedException e) {
LOGGER.error(e);
}
+ gossipThreadExecutor.shutdownNow();
+ scheduledServiced.shutdown();
+ try {
+ scheduledServiced.awaitTermination(1, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOGGER.error(e);
+ }
+ scheduledServiced.shutdownNow();
}
public void gossipPerNodeData(GossipDataMessage message){
@@ -266,6 +290,13 @@
public DataReaper getDataReaper() {
return dataReaper;
}
+
+ public RingStatePersister getRingState() {
+ return ringState;
+ }
+ public UserDataPersister getUserDataState() {
+ return userDataState;
+ }
}
diff --git a/src/main/java/org/apache/gossip/manager/RingStatePersister.java b/src/main/java/org/apache/gossip/manager/RingStatePersister.java
new file mode 100644
index 0000000..24b464a
--- /dev/null
+++ b/src/main/java/org/apache/gossip/manager/RingStatePersister.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.NavigableSet;
+
+import org.apache.gossip.LocalGossipMember;
+import org.apache.log4j.Logger;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class RingStatePersister implements Runnable {
+
+ private static final Logger LOGGER = Logger.getLogger(RingStatePersister.class);
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+ private static final TypeReference<ArrayList<LocalGossipMember>> REF
+ = new TypeReference<ArrayList<LocalGossipMember>>() { };
+ private GossipManager parent;
+
+ public RingStatePersister(GossipManager parent){
+ this.parent = parent;
+ }
+
+ @Override
+ public void run() {
+ writeToDisk();
+ }
+
+ File computeTarget(){
+ return new File(parent.getSettings().getPathToRingState(), "ringstate." + parent.getMyself().getClusterName() + "."
+ + parent.getMyself().getId() + ".json");
+ }
+
+ void writeToDisk(){
+ if (!parent.getSettings().isPersistRingState()){
+ return;
+ }
+ NavigableSet<LocalGossipMember> i = parent.getMembers().keySet();
+ try (FileOutputStream fos = new FileOutputStream(computeTarget())){
+ MAPPER.writeValue(fos, i);
+ } catch (IOException e) {
+ LOGGER.debug(e);
+ }
+ }
+
+ List<LocalGossipMember> readFromDisk(){
+ if (!parent.getSettings().isPersistRingState()){
+ return Collections.emptyList();
+ }
+ try (FileInputStream fos = new FileInputStream(computeTarget())){
+ return MAPPER.readValue(fos, REF);
+ } catch (IOException e) {
+ LOGGER.debug(e);
+ }
+ return Collections.emptyList();
+ }
+
+}
diff --git a/src/main/java/org/apache/gossip/manager/UserDataPersister.java b/src/main/java/org/apache/gossip/manager/UserDataPersister.java
new file mode 100644
index 0000000..c67677a
--- /dev/null
+++ b/src/main/java/org/apache/gossip/manager/UserDataPersister.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.gossip.model.GossipDataMessage;
+import org.apache.gossip.model.SharedGossipDataMessage;
+import org.apache.log4j.Logger;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class UserDataPersister implements Runnable {
+
+ private static final Logger LOGGER = Logger.getLogger(UserDataPersister.class);
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+ private final GossipManager parent;
+ private final GossipCore gossipCore;
+
+ UserDataPersister(GossipManager parent, GossipCore gossipCore){
+ this.parent = parent;
+ this.gossipCore = gossipCore;
+ MAPPER.enableDefaultTyping();
+ }
+
+ File computeSharedTarget(){
+ return new File(parent.getSettings().getPathToDataState(), "shareddata."
+ + parent.getMyself().getClusterName() + "." + parent.getMyself().getId() + ".json");
+ }
+
+ File computePerNodeTarget() {
+ return new File(parent.getSettings().getPathToDataState(), "pernodedata."
+ + parent.getMyself().getClusterName() + "." + parent.getMyself().getId() + ".json");
+ }
+
+ @SuppressWarnings("unchecked")
+ ConcurrentHashMap<String, ConcurrentHashMap<String, GossipDataMessage>> readPerNodeFromDisk(){
+ if (!parent.getSettings().isPersistDataState()){
+ return new ConcurrentHashMap<String, ConcurrentHashMap<String, GossipDataMessage>>();
+ }
+ try (FileInputStream fos = new FileInputStream(computePerNodeTarget())){
+ return MAPPER.readValue(fos, ConcurrentHashMap.class);
+ } catch (IOException e) {
+ LOGGER.debug(e);
+ }
+ return new ConcurrentHashMap<String, ConcurrentHashMap<String, GossipDataMessage>>();
+ }
+
+ void writePerNodeToDisk(){
+ if (!parent.getSettings().isPersistDataState()){
+ return;
+ }
+ try (FileOutputStream fos = new FileOutputStream(computePerNodeTarget())){
+ MAPPER.writeValue(fos, gossipCore.getPerNodeData());
+ } catch (IOException e) {
+ LOGGER.warn(e);
+ }
+ }
+
+ void writeSharedToDisk(){
+ if (!parent.getSettings().isPersistDataState()){
+ return;
+ }
+ try (FileOutputStream fos = new FileOutputStream(computeSharedTarget())){
+ MAPPER.writeValue(fos, gossipCore.getSharedData());
+ } catch (IOException e) {
+ LOGGER.warn(e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ ConcurrentHashMap<String, SharedGossipDataMessage> readSharedDataFromDisk(){
+ if (!parent.getSettings().isPersistRingState()){
+ return new ConcurrentHashMap<String, SharedGossipDataMessage>();
+ }
+ try (FileInputStream fos = new FileInputStream(computeSharedTarget())){
+ return MAPPER.readValue(fos, ConcurrentHashMap.class);
+ } catch (IOException e) {
+ LOGGER.debug(e);
+ }
+ return new ConcurrentHashMap<String, SharedGossipDataMessage>();
+ }
+
+ /**
+ * Writes all pernode and shared data to disk
+ */
+ @Override
+ public void run() {
+ writePerNodeToDisk();
+ writeSharedToDisk();
+ }
+}
diff --git a/src/test/java/org/apache/gossip/DataTest.java b/src/test/java/org/apache/gossip/DataTest.java
index 83879f9..98c7ee0 100644
--- a/src/test/java/org/apache/gossip/DataTest.java
+++ b/src/test/java/org/apache/gossip/DataTest.java
@@ -39,6 +39,8 @@
@Test
public void dataTest() throws InterruptedException, UnknownHostException, URISyntaxException{
GossipSettings settings = new GossipSettings();
+ settings.setPersistRingState(false);
+ settings.setPersistDataState(false);
String cluster = UUID.randomUUID().toString();
int seedNodes = 1;
List<GossipMember> startupMembers = new ArrayList<>();
diff --git a/src/test/java/org/apache/gossip/IdAndPropertyTest.java b/src/test/java/org/apache/gossip/IdAndPropertyTest.java
index 2a98f01..1eb0aee 100644
--- a/src/test/java/org/apache/gossip/IdAndPropertyTest.java
+++ b/src/test/java/org/apache/gossip/IdAndPropertyTest.java
@@ -25,7 +25,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeUnit;
import org.apache.gossip.manager.DatacenterRackAwareActiveGossiper;
import org.junit.jupiter.api.Test;
@@ -75,8 +75,7 @@
value = gossipService2.getGossipManager().getLiveMembers().get(0).getProperties().get("a");
} catch (RuntimeException e){ }
return value;
- }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo("b");
-
+ }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo("b");
gossipService1.shutdown();
gossipService2.shutdown();
diff --git a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
index 9d02556..2386084 100644
--- a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
+++ b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
@@ -47,7 +47,9 @@
@Test
public void DeadNodesDoNotComeAliveAgain()
throws InterruptedException, UnknownHostException, URISyntaxException {
- GossipSettings settings = new GossipSettings(1000, 10000, 1000, 1, 5.0, "exponential");
+ GossipSettings settings = new GossipSettings(1000, 10000, 1000, 1, 2.0, "exponential");
+ settings.setPersistRingState(false);
+ settings.setPersistDataState(false);
String cluster = UUID.randomUUID().toString();
int seedNodes = 3;
List<GossipMember> startupMembers = new ArrayList<>();
diff --git a/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java b/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
index bc4004d..aa797f5 100644
--- a/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
+++ b/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
@@ -48,6 +48,8 @@
public void abc(int base) throws InterruptedException, UnknownHostException, URISyntaxException{
GossipSettings settings = new GossipSettings();
+ settings.setPersistRingState(false);
+ settings.setPersistDataState(false);
String cluster = UUID.randomUUID().toString();
int seedNodes = 3;
List<GossipMember> startupMembers = new ArrayList<>();
diff --git a/src/test/java/org/apache/gossip/manager/DataReaperTest.java b/src/test/java/org/apache/gossip/manager/DataReaperTest.java
index b4ac45d..a9c861c 100644
--- a/src/test/java/org/apache/gossip/manager/DataReaperTest.java
+++ b/src/test/java/org/apache/gossip/manager/DataReaperTest.java
@@ -39,6 +39,8 @@
@Test
public void testReaperOneShot() {
GossipSettings settings = new GossipSettings();
+ settings.setPersistRingState(false);
+ settings.setPersistDataState(false);
GossipManager gm = RandomGossipManager.newBuilder().cluster("abc").settings(settings)
.withId(myId).uri(URI.create("udp://localhost:6000")).registry(registry).build();
gm.init();
diff --git a/src/test/java/org/apache/gossip/manager/RingPersistenceTest.java b/src/test/java/org/apache/gossip/manager/RingPersistenceTest.java
new file mode 100644
index 0000000..6e41bdc
--- /dev/null
+++ b/src/test/java/org/apache/gossip/manager/RingPersistenceTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager;
+
+import java.io.File;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.HashMap;
+import org.apache.gossip.GossipService;
+import org.apache.gossip.GossipSettings;
+import org.apache.gossip.RemoteGossipMember;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.codahale.metrics.MetricRegistry;
+
+public class RingPersistenceTest {
+
+ @Test
+ public void givenThatRingIsPersisted() throws UnknownHostException, InterruptedException, URISyntaxException {
+ GossipSettings settings = new GossipSettings();
+ File f = aGossiperPersists(settings);
+ Assert.assertTrue(f.exists());
+ aNewInstanceGetsRingInfo(settings);
+ f.delete();
+ }
+
+ private File aGossiperPersists(GossipSettings settings) throws UnknownHostException, InterruptedException, URISyntaxException {
+ GossipService gossipService = new GossipService("a", new URI("udp://" + "127.0.0.1" + ":" + (29000 + 1)), "1", new HashMap<String, String>(),
+ Arrays.asList(
+ new RemoteGossipMember("a", new URI("udp://" + "127.0.0.1" + ":" + (29000 + 0)), "0"),
+ new RemoteGossipMember("a", new URI("udp://" + "127.0.0.1" + ":" + (29000 + 2)), "2")
+ ),
+ settings, (a, b) -> { }, new MetricRegistry());
+ gossipService.getGossipManager().getRingState().writeToDisk();
+ return gossipService.getGossipManager().getRingState().computeTarget();
+ }
+
+ private void aNewInstanceGetsRingInfo(GossipSettings settings) throws UnknownHostException, InterruptedException, URISyntaxException{
+ GossipService gossipService2 = new GossipService("a", new URI("udp://" + "127.0.0.1" + ":" + (29000 + 1)), "1", new HashMap<String, String>(),
+ Arrays.asList(),
+ settings, (a, b) -> { }, new MetricRegistry());
+ Assert.assertEquals(2, gossipService2.getGossipManager().getMembers().size());
+ }
+
+}
diff --git a/src/test/java/org/apache/gossip/manager/UserDataPersistenceTest.java b/src/test/java/org/apache/gossip/manager/UserDataPersistenceTest.java
new file mode 100644
index 0000000..e0cbcf4
--- /dev/null
+++ b/src/test/java/org/apache/gossip/manager/UserDataPersistenceTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager;
+
+import java.io.File;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.gossip.GossipService;
+import org.apache.gossip.GossipSettings;
+import org.apache.gossip.model.GossipDataMessage;
+import org.apache.gossip.model.SharedGossipDataMessage;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.codahale.metrics.MetricRegistry;
+
+public class UserDataPersistenceTest {
+
+ @Test
+ public void givenThatRingIsPersisted() throws UnknownHostException, InterruptedException, URISyntaxException {
+ String nodeId = "1";
+ GossipSettings settings = new GossipSettings();
+ { //Create a gossip service and force it to persist its user data
+ GossipService gossipService = new GossipService("a",
+ new URI("udp://" + "127.0.0.1" + ":" + (29000 + 1)), nodeId, new HashMap<String, String>(),
+ Arrays.asList(), settings, (a, b) -> { }, new MetricRegistry());
+ gossipService.start();
+ gossipService.gossipPerNodeData(getToothpick());
+ gossipService.gossipSharedData(getAnotherToothpick());
+ gossipService.getGossipManager().getUserDataState().writePerNodeToDisk();
+ gossipService.getGossipManager().getUserDataState().writeSharedToDisk();
+ { //read the raw data and confirm
+ ConcurrentHashMap<String, ConcurrentHashMap<String, GossipDataMessage>> l = gossipService.getGossipManager().getUserDataState().readPerNodeFromDisk();
+ Assert.assertEquals("red", ((AToothpick) l.get(nodeId).get("a").getPayload()).getColor());
+ }
+ {
+ ConcurrentHashMap<String, SharedGossipDataMessage> l =
+ gossipService.getGossipManager().getUserDataState().readSharedDataFromDisk();
+ Assert.assertEquals("blue", ((AToothpick) l.get("a").getPayload()).getColor());
+ }
+ gossipService.shutdown();
+ }
+ { //recreate the service and see that the data is read back in
+ GossipService gossipService = new GossipService("a",
+ new URI("udp://" + "127.0.0.1" + ":" + (29000 + 1)), nodeId, new HashMap<String, String>(),
+ Arrays.asList(), settings, (a, b) -> { }, new MetricRegistry());
+ gossipService.start();
+ Assert.assertEquals("red", ((AToothpick) gossipService.findPerNodeData(nodeId, "a").getPayload()).getColor());
+ Assert.assertEquals("blue", ((AToothpick) gossipService.findSharedData("a").getPayload()).getColor());
+ File f = gossipService.getGossipManager().getUserDataState().computeSharedTarget();
+ File g = gossipService.getGossipManager().getUserDataState().computePerNodeTarget();
+ gossipService.shutdown();
+ f.delete();
+ g.delete();
+ }
+ }
+
+ public GossipDataMessage getToothpick(){
+ AToothpick a = new AToothpick();
+ a.setColor("red");
+ GossipDataMessage d = new GossipDataMessage();
+ d.setExpireAt(Long.MAX_VALUE);
+ d.setKey("a");
+ d.setPayload(a);
+ d.setTimestamp(System.currentTimeMillis());
+ return d;
+ }
+
+ public SharedGossipDataMessage getAnotherToothpick(){
+ AToothpick a = new AToothpick();
+ a.setColor("blue");
+ SharedGossipDataMessage d = new SharedGossipDataMessage();
+ d.setExpireAt(Long.MAX_VALUE);
+ d.setKey("a");
+ d.setPayload(a);
+ d.setTimestamp(System.currentTimeMillis());
+ return d;
+ }
+
+ public static class AToothpick {
+ private String color;
+ public AToothpick(){
+
+ }
+ public String getColor() {
+ return color;
+ }
+ public void setColor(String color) {
+ this.color = color;
+ }
+
+ }
+}