GOSSIP-71 not merging correctly (egc & maxim)
diff --git a/src/main/java/org/apache/gossip/crdt/OrSet.java b/src/main/java/org/apache/gossip/crdt/OrSet.java
index 972377f..f84dbc7 100644
--- a/src/main/java/org/apache/gossip/crdt/OrSet.java
+++ b/src/main/java/org/apache/gossip/crdt/OrSet.java
@@ -17,16 +17,9 @@
*/
package org.apache.gossip.crdt;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.Map.Entry;
-import java.util.Set;
-import java.util.UUID;
+import java.util.function.BiConsumer;
import org.apache.gossip.crdt.OrSet.Builder.Operation;
@@ -86,11 +79,34 @@
val = computeValue();
}
+ static Set<UUID> mergeSets(Set<UUID> a, Set<UUID> b) {
+ if ((a == null || a.isEmpty()) && (b == null || b.isEmpty())) {
+ return null;
+ }
+ Set<UUID> res = new HashSet<>(a);
+ res.addAll(b);
+ return res;
+ }
+
+ private void internalSetMerge(Map<E, Set<UUID>> map, E key, Set<UUID> value) {
+ if (value == null) {
+ return;
+ }
+ map.merge(key, value, OrSet::mergeSets);
+ }
+
public OrSet(OrSet<E> left, OrSet<E> right){
- elements.putAll(left.elements);
- elements.putAll(right.elements);
- tombstones.putAll(left.tombstones);
- tombstones.putAll(right.tombstones);
+ BiConsumer<Map<E, Set<UUID>>, Map<E, Set<UUID>>> internalMerge = (items, other) -> {
+ for (Entry<E, Set<UUID>> l : other.entrySet()){
+ internalSetMerge(items, l.getKey(), l.getValue());
+ }
+ };
+
+ internalMerge.accept(elements, left.elements);
+ internalMerge.accept(elements, right.elements);
+ internalMerge.accept(tombstones, left.tombstones);
+ internalMerge.accept(tombstones, right.tombstones);
+
val = computeValue();
}
@@ -103,29 +119,14 @@
return new OrSet<E>(this, other);
}
- private void internalAdd(E element){
- Set<UUID> l = elements.get(element);
- if (l == null){
- Set<UUID> d = new HashSet<UUID>();
- d.add(UUID.randomUUID());
- elements.put(element, d);
- } else {
- l.add(UUID.randomUUID());
- }
+ private void internalAdd(E element) {
+ Set<UUID> toMerge = new HashSet<>();
+ toMerge.add(UUID.randomUUID());
+ internalSetMerge(elements, element, toMerge);
}
private void internalRemove(E element){
- Set<UUID> elementIds = elements.get(element);
- if (elementIds == null){
- //deleting elements not in the list
- return;
- }
- Set<UUID> current = tombstones.get(element);
- if (current != null){
- current.addAll(elementIds);
- } else {
- tombstones.put(element, elementIds);
- }
+ internalSetMerge(tombstones, element, elements.get(element));
}
/*
@@ -134,18 +135,10 @@
private Set<E> computeValue(){
Set<E> values = new HashSet<>();
for (Entry<E, Set<UUID>> entry: elements.entrySet()){
- if (entry.getValue() == null || entry.getValue().size() == 0){
- continue;
- }
Set<UUID> deleteIds = tombstones.get(entry.getKey());
- if (deleteIds == null){
+ // if not all tokens for current element are in tombstones
+ if (deleteIds == null || !deleteIds.containsAll(entry.getValue())) {
values.add(entry.getKey());
- } else {
- if (!deleteIds.containsAll(entry.getValue())){
- values.add(entry.getKey());
- } else {
- //if all the entry uuid is deleted the entry is deleted
- }
}
}
return values;
diff --git a/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java b/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java
new file mode 100644
index 0000000..d1c1751
--- /dev/null
+++ b/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.examples;
+
+import com.codahale.metrics.MetricRegistry;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.HashMap;
+
+import org.apache.gossip.GossipService;
+import org.apache.gossip.GossipSettings;
+import org.apache.gossip.RemoteGossipMember;
+import org.apache.gossip.crdt.OrSet;
+import org.apache.gossip.model.SharedGossipDataMessage;
+
+public class StandAloneNodeCrdtOrSet {
+ public static void main (String [] args) throws InterruptedException, IOException{
+ GossipSettings s = new GossipSettings();
+ s.setWindowSize(10);
+ s.setConvictThreshold(1.0);
+ s.setGossipInterval(10);
+ GossipService gossipService = new GossipService("mycluster", URI.create(args[0]), args[1], new HashMap<String, String>(),
+ Arrays.asList( new RemoteGossipMember("mycluster", URI.create(args[2]), args[3])), s, (a,b) -> {}, new MetricRegistry());
+ gossipService.start();
+
+ new Thread(() -> {
+ while (true){
+ System.out.println("Live: " + gossipService.getGossipManager().getLiveMembers());
+ System.out.println("Dead: " + gossipService.getGossipManager().getDeadMembers());
+ System.out.println("---------- " + (gossipService.getGossipManager().findCrdt("abc") == null ? "":
+ gossipService.getGossipManager().findCrdt("abc").value()));
+ System.out.println("********** " + gossipService.getGossipManager().findCrdt("abc"));
+ try {
+ Thread.sleep(2000);
+ } catch (Exception e) {}
+ }
+ }).start();
+
+ String line = null;
+ try (BufferedReader br = new BufferedReader(new InputStreamReader(System.in))){
+ while ( (line = br.readLine()) != null){
+ System.out.println(line);
+ char op = line.charAt(0);
+ String val = line.substring(2);
+ if (op == 'a'){
+ addData(val, gossipService);
+ } else {
+ removeData(val, gossipService);
+ }
+ }
+ }
+ }
+
+ private static void removeData(String val, GossipService gossipService){
+ OrSet<String> s = (OrSet<String>) gossipService.getGossipManager().findCrdt("abc");
+ SharedGossipDataMessage m = new SharedGossipDataMessage();
+ m.setExpireAt(Long.MAX_VALUE);
+ m.setKey("abc");
+ m.setPayload(new OrSet<String>(s , new OrSet.Builder<String>().remove(val)));
+ m.setTimestamp(System.currentTimeMillis());
+ gossipService.getGossipManager().merge(m);
+ }
+
+ private static void addData(String val, GossipService gossipService){
+ SharedGossipDataMessage m = new SharedGossipDataMessage();
+ m.setExpireAt(Long.MAX_VALUE);
+ m.setKey("abc");
+ m.setPayload(new OrSet<String>(val));
+ m.setTimestamp(System.currentTimeMillis());
+ gossipService.getGossipManager().merge(m);
+ }
+}
diff --git a/src/main/java/org/apache/gossip/manager/GossipCore.java b/src/main/java/org/apache/gossip/manager/GossipCore.java
index dff6413..a24b125 100644
--- a/src/main/java/org/apache/gossip/manager/GossipCore.java
+++ b/src/main/java/org/apache/gossip/manager/GossipCore.java
@@ -114,28 +114,36 @@
@SuppressWarnings({ "unchecked", "rawtypes" })
public void addSharedData(SharedGossipDataMessage message) {
- SharedGossipDataMessage previous = sharedData.get(message.getKey());
- if (previous == null) {
- sharedData.putIfAbsent(message.getKey(), message);
- } else {
+ while (true){
+ SharedGossipDataMessage previous = sharedData.putIfAbsent(message.getKey(), message);
+ if (previous == null){
+ return;
+ }
if (message.getPayload() instanceof Crdt){
- SharedGossipDataMessage curretnt = sharedData.get(message.getKey());
SharedGossipDataMessage merged = new SharedGossipDataMessage();
merged.setExpireAt(message.getExpireAt());
- merged.setKey(curretnt.getKey());
+ merged.setKey(message.getKey());
merged.setNodeId(message.getNodeId());
merged.setTimestamp(message.getTimestamp());
- Crdt mergedCrdt = ((Crdt) message.getPayload()).merge((Crdt)curretnt.getPayload());
- merged.setPayload( mergedCrdt );
- sharedData.put(curretnt.getKey(), merged);
+ Crdt mergedCrdt = ((Crdt) previous.getPayload()).merge((Crdt) message.getPayload());
+ merged.setPayload(mergedCrdt);
+ boolean replaced = sharedData.replace(message.getKey(), previous, merged);
+ if (replaced){
+ return;
+ }
} else {
- if (previous.getTimestamp() < message.getTimestamp()) {
- sharedData.replace(message.getKey(), previous, message);
+ if (previous.getTimestamp() < message.getTimestamp()){
+ boolean result = sharedData.replace(message.getKey(), previous, message);
+ if (result){
+ return;
+ }
+ } else {
+ return;
}
}
}
}
-
+
public void addPerNodeData(GossipDataMessage message){
ConcurrentHashMap<String,GossipDataMessage> nodeMap = new ConcurrentHashMap<>();
nodeMap.put(message.getKey(), message);
@@ -363,8 +371,8 @@
@SuppressWarnings("rawtypes")
public Crdt merge(SharedGossipDataMessage message) {
for (;;){
- SharedGossipDataMessage ret = sharedData.putIfAbsent(message.getKey(), message);
- if (ret == null){
+ SharedGossipDataMessage previous = sharedData.putIfAbsent(message.getKey(), message);
+ if (previous == null){
return (Crdt) message.getPayload();
}
SharedGossipDataMessage copy = new SharedGossipDataMessage();
@@ -373,9 +381,9 @@
copy.setNodeId(message.getNodeId());
copy.setTimestamp(message.getTimestamp());
@SuppressWarnings("unchecked")
- Crdt merged = ((Crdt) ret.getPayload()).merge((Crdt) message.getPayload());
+ Crdt merged = ((Crdt) previous.getPayload()).merge((Crdt) message.getPayload());
copy.setPayload(merged);
- boolean replaced = sharedData.replace(message.getKey(), ret, copy);
+ boolean replaced = sharedData.replace(message.getKey(), previous, copy);
if (replaced){
return merged;
}
diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java
index 0140f00..4b28f2f 100644
--- a/src/main/java/org/apache/gossip/manager/GossipManager.java
+++ b/src/main/java/org/apache/gossip/manager/GossipManager.java
@@ -317,6 +317,7 @@
}
return gossipCore.merge(message);
}
+
public GossipDataMessage findPerNodeGossipData(String nodeId, String key){
ConcurrentHashMap<String, GossipDataMessage> j = gossipCore.getPerNodeData().get(nodeId);
if (j == null){
diff --git a/src/test/java/org/apache/gossip/crdt/OrSetTest.java b/src/test/java/org/apache/gossip/crdt/OrSetTest.java
index 8b8766a..e576764 100644
--- a/src/test/java/org/apache/gossip/crdt/OrSetTest.java
+++ b/src/test/java/org/apache/gossip/crdt/OrSetTest.java
@@ -102,4 +102,15 @@
Assert.assertEquals(back, i);
}
-}
+ @Test
+ public void mergeTestSame() {
+ OrSet<Integer> i = new OrSet<>(19);
+ OrSet<Integer> j = new OrSet<>(19);
+ OrSet<Integer> k = i.merge(j);
+ Assert.assertEquals(2, k.getElements().get(19).size());
+ OrSet<Integer> y = new OrSet<>(k, new OrSet.Builder<Integer>().remove(19));
+ Assert.assertEquals(2, y.getTombstones().get(19).size());
+ Assert.assertEquals(2, y.getElements().get(19).size());
+ Assert.assertEquals(new OrSet<Integer>().value(), y.value());
+ }
+}
\ No newline at end of file