GOSSIP-71 not merging correctly (egc & maxim)
diff --git a/src/main/java/org/apache/gossip/crdt/OrSet.java b/src/main/java/org/apache/gossip/crdt/OrSet.java
index 972377f..f84dbc7 100644
--- a/src/main/java/org/apache/gossip/crdt/OrSet.java
+++ b/src/main/java/org/apache/gossip/crdt/OrSet.java
@@ -17,16 +17,9 @@
  */
 package org.apache.gossip.crdt;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.Map.Entry;
-import java.util.Set;
-import java.util.UUID;
+import java.util.function.BiConsumer;
 
 import org.apache.gossip.crdt.OrSet.Builder.Operation;
 
@@ -86,11 +79,34 @@
     val = computeValue();
   }
 
+  static Set<UUID> mergeSets(Set<UUID> a, Set<UUID> b) {
+    if ((a == null || a.isEmpty()) && (b == null || b.isEmpty())) {
+      return null;
+    }
+    Set<UUID> res = new HashSet<>(a);
+    res.addAll(b);
+    return res;
+  }
+
+  private void internalSetMerge(Map<E, Set<UUID>> map, E key, Set<UUID> value) {
+    if (value == null) {
+      return;
+    }
+    map.merge(key, value, OrSet::mergeSets);
+  }
+
   public OrSet(OrSet<E> left, OrSet<E> right){
-    elements.putAll(left.elements);
-    elements.putAll(right.elements);
-    tombstones.putAll(left.tombstones);
-    tombstones.putAll(right.tombstones);
+    BiConsumer<Map<E, Set<UUID>>, Map<E, Set<UUID>>> internalMerge = (items, other) -> {
+      for (Entry<E, Set<UUID>> l : other.entrySet()){
+        internalSetMerge(items, l.getKey(), l.getValue());
+      }
+    };
+
+    internalMerge.accept(elements, left.elements);
+    internalMerge.accept(elements, right.elements);
+    internalMerge.accept(tombstones, left.tombstones);
+    internalMerge.accept(tombstones, right.tombstones);
+
     val = computeValue();
   }
   
@@ -103,29 +119,14 @@
     return new OrSet<E>(this, other);
   }
   
-  private void internalAdd(E element){
-    Set<UUID> l = elements.get(element);
-    if (l == null){
-      Set<UUID> d = new HashSet<UUID>();
-      d.add(UUID.randomUUID());
-      elements.put(element, d);
-    } else {
-      l.add(UUID.randomUUID());
-    }
+  private void internalAdd(E element) {
+    Set<UUID> toMerge = new HashSet<>();
+    toMerge.add(UUID.randomUUID());
+    internalSetMerge(elements, element, toMerge);
   }
   
   private void internalRemove(E element){
-    Set<UUID> elementIds = elements.get(element);
-    if (elementIds == null){
-      //deleting elements not in the list
-      return;
-    }
-    Set<UUID> current = tombstones.get(element);
-    if (current != null){
-      current.addAll(elementIds);
-    } else {
-      tombstones.put(element, elementIds);
-    }
+    internalSetMerge(tombstones, element, elements.get(element));
   }
 
   /*
@@ -134,18 +135,10 @@
   private Set<E> computeValue(){
     Set<E> values = new HashSet<>();
     for (Entry<E, Set<UUID>> entry: elements.entrySet()){
-      if (entry.getValue() == null || entry.getValue().size() == 0){
-        continue;
-      }
       Set<UUID> deleteIds = tombstones.get(entry.getKey());
-      if (deleteIds == null){
+      // if not all tokens for current element are in tombstones
+      if (deleteIds == null || !deleteIds.containsAll(entry.getValue())) {
         values.add(entry.getKey());
-      } else {
-        if (!deleteIds.containsAll(entry.getValue())){
-          values.add(entry.getKey());
-        } else {
-          //if all the entry uuid is deleted the entry is deleted
-        }
       }
     }
     return values;
diff --git a/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java b/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java
new file mode 100644
index 0000000..d1c1751
--- /dev/null
+++ b/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.examples;
+
+import com.codahale.metrics.MetricRegistry;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.HashMap;
+
+import org.apache.gossip.GossipService;
+import org.apache.gossip.GossipSettings;
+import org.apache.gossip.RemoteGossipMember;
+import org.apache.gossip.crdt.OrSet;
+import org.apache.gossip.model.SharedGossipDataMessage;
+
+public class StandAloneNodeCrdtOrSet {
+  public static void main (String [] args) throws InterruptedException, IOException{
+    GossipSettings s = new GossipSettings();
+    s.setWindowSize(10);
+    s.setConvictThreshold(1.0);
+    s.setGossipInterval(10);
+    GossipService gossipService = new GossipService("mycluster",  URI.create(args[0]), args[1], new HashMap<String, String>(),
+            Arrays.asList( new RemoteGossipMember("mycluster", URI.create(args[2]), args[3])), s, (a,b) -> {}, new MetricRegistry());
+    gossipService.start();
+    
+    new Thread(() -> {
+      while (true){
+      System.out.println("Live: " + gossipService.getGossipManager().getLiveMembers());
+      System.out.println("Dead: " + gossipService.getGossipManager().getDeadMembers());
+      System.out.println("---------- " + (gossipService.getGossipManager().findCrdt("abc") == null ? "": 
+          gossipService.getGossipManager().findCrdt("abc").value()));
+      System.out.println("********** " + gossipService.getGossipManager().findCrdt("abc"));
+      try {
+        Thread.sleep(2000);
+      } catch (Exception e) {}
+      }
+    }).start();
+    
+    String line = null;
+    try (BufferedReader br = new BufferedReader(new InputStreamReader(System.in))){
+      while ( (line = br.readLine()) != null){
+        System.out.println(line);
+        char op = line.charAt(0);
+        String val = line.substring(2);
+        if (op == 'a'){
+          addData(val, gossipService);
+        } else {
+          removeData(val, gossipService);
+        }
+      }
+    }
+  }
+  
+  private static void removeData(String val, GossipService gossipService){
+    OrSet<String> s = (OrSet<String>) gossipService.getGossipManager().findCrdt("abc");
+    SharedGossipDataMessage m = new SharedGossipDataMessage();
+    m.setExpireAt(Long.MAX_VALUE);
+    m.setKey("abc");
+    m.setPayload(new OrSet<String>(s , new OrSet.Builder<String>().remove(val)));
+    m.setTimestamp(System.currentTimeMillis());
+    gossipService.getGossipManager().merge(m);
+  }
+  
+  private static void addData(String val, GossipService gossipService){
+    SharedGossipDataMessage m = new SharedGossipDataMessage();
+    m.setExpireAt(Long.MAX_VALUE);
+    m.setKey("abc");
+    m.setPayload(new OrSet<String>(val));
+    m.setTimestamp(System.currentTimeMillis());
+    gossipService.getGossipManager().merge(m);
+  }
+}
diff --git a/src/main/java/org/apache/gossip/manager/GossipCore.java b/src/main/java/org/apache/gossip/manager/GossipCore.java
index dff6413..a24b125 100644
--- a/src/main/java/org/apache/gossip/manager/GossipCore.java
+++ b/src/main/java/org/apache/gossip/manager/GossipCore.java
@@ -114,28 +114,36 @@
 
   @SuppressWarnings({ "unchecked", "rawtypes" })
   public void addSharedData(SharedGossipDataMessage message) {
-    SharedGossipDataMessage previous = sharedData.get(message.getKey());
-    if (previous == null) {
-      sharedData.putIfAbsent(message.getKey(), message);
-    } else {
+    while (true){
+      SharedGossipDataMessage previous = sharedData.putIfAbsent(message.getKey(), message);
+      if (previous == null){
+        return;
+      }
       if (message.getPayload() instanceof Crdt){
-        SharedGossipDataMessage curretnt = sharedData.get(message.getKey());
         SharedGossipDataMessage merged = new SharedGossipDataMessage();
         merged.setExpireAt(message.getExpireAt());
-        merged.setKey(curretnt.getKey());
+        merged.setKey(message.getKey());
         merged.setNodeId(message.getNodeId());
         merged.setTimestamp(message.getTimestamp());
-        Crdt mergedCrdt = ((Crdt) message.getPayload()).merge((Crdt)curretnt.getPayload());
-        merged.setPayload( mergedCrdt );
-        sharedData.put(curretnt.getKey(), merged);
+        Crdt mergedCrdt = ((Crdt) previous.getPayload()).merge((Crdt) message.getPayload());
+        merged.setPayload(mergedCrdt);
+        boolean replaced = sharedData.replace(message.getKey(), previous, merged);
+        if (replaced){
+          return;
+        }
       } else {
-        if (previous.getTimestamp() < message.getTimestamp()) {
-          sharedData.replace(message.getKey(), previous, message);
+        if (previous.getTimestamp() < message.getTimestamp()){
+          boolean result = sharedData.replace(message.getKey(), previous, message);
+          if (result){
+            return;
+          }
+        } else {
+          return;
         }
       }
     }
   }
-
+  
   public void addPerNodeData(GossipDataMessage message){
     ConcurrentHashMap<String,GossipDataMessage> nodeMap = new ConcurrentHashMap<>();
     nodeMap.put(message.getKey(), message);
@@ -363,8 +371,8 @@
   @SuppressWarnings("rawtypes")
   public Crdt merge(SharedGossipDataMessage message) {
     for (;;){
-      SharedGossipDataMessage ret = sharedData.putIfAbsent(message.getKey(), message);
-      if (ret == null){
+      SharedGossipDataMessage previous = sharedData.putIfAbsent(message.getKey(), message);
+      if (previous == null){
         return (Crdt) message.getPayload();
       }
       SharedGossipDataMessage copy = new SharedGossipDataMessage();
@@ -373,9 +381,9 @@
       copy.setNodeId(message.getNodeId());
       copy.setTimestamp(message.getTimestamp());
       @SuppressWarnings("unchecked")
-      Crdt merged = ((Crdt) ret.getPayload()).merge((Crdt) message.getPayload());
+      Crdt merged = ((Crdt) previous.getPayload()).merge((Crdt) message.getPayload());
       copy.setPayload(merged);
-      boolean replaced = sharedData.replace(message.getKey(), ret, copy);
+      boolean replaced = sharedData.replace(message.getKey(), previous, copy);
       if (replaced){
         return merged;
       }
diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java
index 0140f00..4b28f2f 100644
--- a/src/main/java/org/apache/gossip/manager/GossipManager.java
+++ b/src/main/java/org/apache/gossip/manager/GossipManager.java
@@ -317,6 +317,7 @@
     }
     return gossipCore.merge(message);
   }
+  
   public GossipDataMessage findPerNodeGossipData(String nodeId, String key){
     ConcurrentHashMap<String, GossipDataMessage> j = gossipCore.getPerNodeData().get(nodeId);
     if (j == null){
diff --git a/src/test/java/org/apache/gossip/crdt/OrSetTest.java b/src/test/java/org/apache/gossip/crdt/OrSetTest.java
index 8b8766a..e576764 100644
--- a/src/test/java/org/apache/gossip/crdt/OrSetTest.java
+++ b/src/test/java/org/apache/gossip/crdt/OrSetTest.java
@@ -102,4 +102,15 @@
     Assert.assertEquals(back, i);
   }
   
-}
+  @Test
+  public void mergeTestSame() {
+    OrSet<Integer> i = new OrSet<>(19);
+    OrSet<Integer> j = new OrSet<>(19);
+    OrSet<Integer> k = i.merge(j);
+    Assert.assertEquals(2, k.getElements().get(19).size());
+    OrSet<Integer> y = new OrSet<>(k, new OrSet.Builder<Integer>().remove(19));
+    Assert.assertEquals(2, y.getTombstones().get(19).size());
+    Assert.assertEquals(2, y.getElements().get(19).size());
+    Assert.assertEquals(new OrSet<Integer>().value(), y.value());
+  }
+}
\ No newline at end of file