GOSSIP-64 Implement Max-Change-Sets
diff --git a/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtAddRemoveSet.java b/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtAddRemoveSet.java
new file mode 100644
index 0000000..55ba019
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtAddRemoveSet.java
@@ -0,0 +1,12 @@
+package org.apache.gossip.crdt;
+
+import java.util.Set;
+
+// Interface extends CrdtSet interface with add and remove operation that are guaranteed to be immutable.
+// If your implementation provide immutable add/remove operations you can extend AbstractCRDTStringSetTest to check it in the most ways.
+
+public interface CrdtAddRemoveSet<T, SetType extends Set<T>, R extends CrdtAddRemoveSet<T, SetType, R>> extends CrdtSet<T, SetType, R> {
+  R add(T element);
+
+  R remove(T element);
+}
diff --git a/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java b/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java
index 1c95b28..7ec96e7 100644
--- a/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java
+++ b/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java
@@ -17,16 +17,16 @@
  */
 package org.apache.gossip.crdt;
 
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.Version;
 import com.fasterxml.jackson.databind.module.SimpleModule;
 
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
 abstract class OrSetMixin<E> {
   @JsonCreator
   OrSetMixin(@JsonProperty("elements") Map<E, Set<UUID>> w, @JsonProperty("tombstones") Map<E, Set<UUID>> h) { }
@@ -37,8 +37,8 @@
 
 abstract class LWWSetMixin<ElementType> {
   @JsonCreator
-  LWWSetMixin(@JsonProperty("data") Map<ElementType, LWWSet.Timestamps> struct) { }
-  @JsonProperty("data") abstract Map<ElementType, LWWSet.Timestamps> getStruct();
+  LWWSetMixin(@JsonProperty("data") Map<ElementType, LwwSet.Timestamps> struct) { }
+  @JsonProperty("data") abstract Map<ElementType, LwwSet.Timestamps> getStruct();
 }
 
 abstract class LWWSetTimestampsMixin {
@@ -48,6 +48,12 @@
   @JsonProperty("remove") abstract long getLatestRemove();
 }
 
+abstract class MaxChangeSetMixin<E> {
+  @JsonCreator
+  MaxChangeSetMixin(@JsonProperty("data") Map<E, Integer> struct) { }
+  @JsonProperty("data") abstract Map<E, Integer> getStruct();
+}
+
 abstract class GrowOnlySetMixin<E>{
   @JsonCreator
   GrowOnlySetMixin(@JsonProperty("elements") Set<E> elements){ }
@@ -84,8 +90,9 @@
     context.setMixInAnnotations(GrowOnlySet.class, GrowOnlySetMixin.class);
     context.setMixInAnnotations(GrowOnlyCounter.class, GrowOnlyCounterMixin.class);
     context.setMixInAnnotations(PNCounter.class, PNCounterMixin.class);
-    context.setMixInAnnotations(LWWSet.class, LWWSetMixin.class);
-    context.setMixInAnnotations(LWWSet.Timestamps.class, LWWSetTimestampsMixin.class);
+    context.setMixInAnnotations(LwwSet.class, LWWSetMixin.class);
+    context.setMixInAnnotations(LwwSet.Timestamps.class, LWWSetTimestampsMixin.class);
+    context.setMixInAnnotations(MaxChangeSet.class, MaxChangeSetMixin.class);
   }
 
 }
diff --git a/gossip-base/src/main/java/org/apache/gossip/crdt/LWWSet.java b/gossip-base/src/main/java/org/apache/gossip/crdt/LwwSet.java
similarity index 67%
rename from gossip-base/src/main/java/org/apache/gossip/crdt/LWWSet.java
rename to gossip-base/src/main/java/org/apache/gossip/crdt/LwwSet.java
index b51ce7a..391cb09 100644
--- a/gossip-base/src/main/java/org/apache/gossip/crdt/LWWSet.java
+++ b/gossip-base/src/main/java/org/apache/gossip/crdt/LwwSet.java
@@ -20,12 +20,31 @@
 import org.apache.gossip.manager.Clock;
 import org.apache.gossip.manager.SystemClock;
 
-import java.util.*;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-public class LWWSet<ElementType> implements CrdtSet<ElementType, Set<ElementType>, LWWSet<ElementType>> {
+/*
+  Last write wins CrdtSet
+  Each operation has timestamp: when you add or remove SystemClock is used to get current time in nanoseconds.
+  When all add/remove operations are within the only node LWWSet is guaranteed to work like a Set.
+  If you have multiple nodes with ideally synchronized clocks:
+    You will observe operations on all machines later than on the initiator, but the last operations on cluster will win.
+  If you have some significant clock drift you will suffer from data loss.
+
+  Read more: https://github.com/aphyr/meangirls#lww-element-set
+
+  You can view examples of usage in tests:
+  LwwSetTest - unit tests
+  DataTest - integration test with 2 nodes, LWWSet was serialized/deserialized, sent between nodes, merged
+*/
+
+public class LwwSet<ElementType> implements CrdtAddRemoveSet<ElementType, Set<ElementType>, LwwSet<ElementType>> {
   static private Clock clock = new SystemClock();
 
   private final Map<ElementType, Timestamps> struct;
@@ -44,11 +63,11 @@
       latestRemove = remove;
     }
 
-    long getLatestAdd() {
+    long getLatestAdd(){
       return latestAdd;
     }
 
-    long getLatestRemove() {
+    long getLatestRemove(){
       return latestRemove;
     }
 
@@ -74,23 +93,23 @@
   }
 
 
-  public LWWSet(){
+  public LwwSet(){
     struct = new HashMap<>();
   }
 
   @SafeVarargs
-  public LWWSet(ElementType... elements){
+  public LwwSet(ElementType... elements){
     this(new HashSet<>(Arrays.asList(elements)));
   }
 
-  public LWWSet(Set<ElementType> set){
+  public LwwSet(Set<ElementType> set){
     struct = new HashMap<>();
     for (ElementType e : set){
       struct.put(e, new Timestamps().updateAdd());
     }
   }
 
-  public LWWSet(LWWSet<ElementType> first, LWWSet<ElementType> second){
+  public LwwSet(LwwSet<ElementType> first, LwwSet<ElementType> second){
     Function<ElementType, Timestamps> timestampsFor = p -> {
       Timestamps firstTs = first.struct.get(p);
       Timestamps secondTs = second.struct.get(p);
@@ -103,33 +122,33 @@
         .distinct().collect(Collectors.toMap(p -> p, timestampsFor));
   }
 
-  public LWWSet<ElementType> add(ElementType e){
-    return this.merge(new LWWSet<>(e));
+  public LwwSet<ElementType> add(ElementType e){
+    return this.merge(new LwwSet<>(e));
   }
 
   // for serialization
-  LWWSet(Map<ElementType, Timestamps> struct){
+  LwwSet(Map<ElementType, Timestamps> struct){
     this.struct = struct;
   }
 
-  Map<ElementType, Timestamps> getStruct() {
+  Map<ElementType, Timestamps> getStruct(){
     return struct;
   }
 
 
-  public LWWSet<ElementType> remove(ElementType e){
+  public LwwSet<ElementType> remove(ElementType e){
     Timestamps eTimestamps = struct.get(e);
     if (eTimestamps == null || !eTimestamps.isPresent()){
       return this;
     }
     Map<ElementType, Timestamps> changeMap = new HashMap<>();
     changeMap.put(e, eTimestamps.updateRemove());
-    return this.merge(new LWWSet<>(changeMap));
+    return this.merge(new LwwSet<>(changeMap));
   }
 
   @Override
-  public LWWSet<ElementType> merge(LWWSet<ElementType> other){
-    return new LWWSet<>(this, other);
+  public LwwSet<ElementType> merge(LwwSet<ElementType> other){
+    return new LwwSet<>(this, other);
   }
 
   @Override
@@ -141,12 +160,12 @@
   }
 
   @Override
-  public LWWSet<ElementType> optimize(){
+  public LwwSet<ElementType> optimize(){
     return this;
   }
 
   @Override
   public boolean equals(Object obj){
-    return this == obj || (obj != null && getClass() == obj.getClass() && value().equals(((LWWSet) obj).value()));
+    return this == obj || (obj != null && getClass() == obj.getClass() && value().equals(((LwwSet) obj).value()));
   }
 }
\ No newline at end of file
diff --git a/gossip-base/src/main/java/org/apache/gossip/crdt/MaxChangeSet.java b/gossip-base/src/main/java/org/apache/gossip/crdt/MaxChangeSet.java
new file mode 100644
index 0000000..0b72b80
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/crdt/MaxChangeSet.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.crdt;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/*
+  Max Change Set CrdtSet. Value which has changed the most wins.
+  You cannot delete an element which is not present, and cannot add an element which is already present.
+   MC-sets are compact and do the right thing when changes to elements are infrequent compared to the gossiping period.
+
+  Read more: https://github.com/aphyr/meangirls#max-change-sets
+  You can view examples of usage in tests:
+  MaxChangeSetTest - unit tests
+  DataTest - integration test with 2 nodes, MaxChangeSet was serialized/deserialized, sent between nodes, merged
+*/
+
+public class MaxChangeSet<ElementType> implements CrdtAddRemoveSet<ElementType, Set<ElementType>, MaxChangeSet<ElementType>> {
+  private final Map<ElementType, Integer> struct;
+
+  public MaxChangeSet(){
+    struct = new HashMap<>();
+  }
+
+  @SafeVarargs
+  public MaxChangeSet(ElementType... elements){
+    this(new HashSet<>(Arrays.asList(elements)));
+  }
+
+  public MaxChangeSet(Set<ElementType> set){
+    struct = new HashMap<>();
+    for (ElementType e : set){
+      struct.put(e, 1);
+    }
+  }
+
+  public MaxChangeSet(MaxChangeSet<ElementType> first, MaxChangeSet<ElementType> second){
+    Function<ElementType, Integer> valueFor = element ->
+        Math.max(first.struct.getOrDefault(element, 0), second.struct.getOrDefault(element, 0));
+    struct = Stream.concat(first.struct.keySet().stream(), second.struct.keySet().stream())
+        .distinct().collect(Collectors.toMap(p -> p, valueFor));
+  }
+
+  // for serialization
+  MaxChangeSet(Map<ElementType, Integer> struct){
+    this.struct = struct;
+  }
+
+  Map<ElementType, Integer> getStruct(){
+    return struct;
+  }
+
+  private MaxChangeSet<ElementType> increment(ElementType e){
+    Map<ElementType, Integer> changeMap = new HashMap<>();
+    changeMap.put(e, struct.getOrDefault(e, 0) + 1);
+    return this.merge(new MaxChangeSet<>(changeMap));
+  }
+
+  public MaxChangeSet<ElementType> add(ElementType e){
+    if (struct.getOrDefault(e, 0) % 2 == 1){
+      return this;
+    }
+    return increment(e);
+  }
+
+  public MaxChangeSet<ElementType> remove(ElementType e){
+    if (struct.getOrDefault(e, 0) % 2 == 0){
+      return this;
+    }
+    return increment(e);
+  }
+
+  @Override
+  public MaxChangeSet<ElementType> merge(MaxChangeSet<ElementType> other){
+    return new MaxChangeSet<>(this, other);
+  }
+
+  @Override
+  public Set<ElementType> value(){
+    return struct.entrySet().stream()
+        .filter(entry -> (entry.getValue() % 2 == 1))
+        .map(Map.Entry::getKey)
+        .collect(Collectors.toSet());
+  }
+
+  @Override
+  public MaxChangeSet<ElementType> optimize(){
+    return this;
+  }
+
+  @Override
+  public boolean equals(Object obj){
+    return this == obj || (obj != null && getClass() == obj.getClass() && value().equals(((MaxChangeSet) obj).value()));
+  }
+}
\ No newline at end of file
diff --git a/gossip-base/src/main/java/org/apache/gossip/crdt/OrSet.java b/gossip-base/src/main/java/org/apache/gossip/crdt/OrSet.java
index f84dbc7..68b089a 100644
--- a/gossip-base/src/main/java/org/apache/gossip/crdt/OrSet.java
+++ b/gossip-base/src/main/java/org/apache/gossip/crdt/OrSet.java
@@ -26,7 +26,7 @@
 /*
  * A immutable set 
  */
-public class OrSet<E>  implements Crdt<Set<E>, OrSet<E>> {
+public class OrSet<E>  implements CrdtAddRemoveSet<E, Set<E>, OrSet<E>> {
   
   private final Map<E, Set<UUID>> elements = new HashMap<>();
   private final Map<E, Set<UUID>> tombstones = new HashMap<>();
@@ -44,6 +44,10 @@
   
   @SafeVarargs
   public OrSet(E ... elements){
+    this(new HashSet<>(Arrays.asList(elements)));
+  }
+
+  public OrSet(Set<E> elements) {
     for (E e: elements){
       internalAdd(e);
     }
@@ -109,7 +113,15 @@
 
     val = computeValue();
   }
-  
+
+  public OrSet<E> add(E e) {
+    return this.merge(new OrSet<>(e));
+  }
+
+  public OrSet<E> remove(E e) {
+    return new OrSet<>(this, new Builder<E>().remove(e));
+  }
+
   public OrSet.Builder<E> builder(){
     return new OrSet.Builder<>();
   }
@@ -233,15 +245,6 @@
     return value().toArray(a);
   }
 
-  public boolean add(E e) {
-    throw new IllegalArgumentException("Can not add");
-  }
-
-
-  public boolean remove(Object o) {
-    throw new IllegalArgumentException();
-  }
-
   public boolean containsAll(Collection<?> c) {
     return this.value().containsAll(c);
   }
diff --git a/gossip-base/src/test/java/org/apache/gossip/crdt/AbstractCRDTStringSetTest.java b/gossip-base/src/test/java/org/apache/gossip/crdt/AbstractCRDTStringSetTest.java
new file mode 100644
index 0000000..d4db4ce
--- /dev/null
+++ b/gossip-base/src/test/java/org/apache/gossip/crdt/AbstractCRDTStringSetTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unle<F4>ss required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.crdt;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.Ignore;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/*
+  Abstract test suit to test CrdtSets with Add and Remove operations.
+  It compares them with simple sets, validates add, remove, equals, value, etc. operations
+  To use it you should:
+  1. subclass this and implement constructors
+  2. implement CrdtAddRemoveSet in your CrdtSet
+  3. make your CrdtSet immutable
+*/
+
+@Ignore
+public abstract class AbstractCRDTStringSetTest<SetType extends CrdtAddRemoveSet<String, Set<String>, SetType>> {
+  abstract SetType construct(Set<String> set);
+
+  abstract SetType construct();
+
+  private Set<String> sampleSet;
+
+  @Before
+  public void setup(){
+    sampleSet = new HashSet<>();
+    sampleSet.add("4");
+    sampleSet.add("5");
+    sampleSet.add("12");
+  }
+
+  @Test
+  public void abstractSetConstructorTest(){
+    Assert.assertEquals(construct(sampleSet).value(), sampleSet);
+  }
+
+  @Test
+  public void abstractStressWithSetTest(){
+    Set<String> hashSet = new HashSet<>();
+    SetType set = construct();
+    for (int it = 0; it < 40; it++){
+      SetType newSet;
+      if (it % 5 == 1){
+        //deleting existing
+        String forDelete = hashSet.stream().skip((long) (hashSet.size() * Math.random())).findFirst().get();
+        newSet = set.remove(forDelete);
+        Assert.assertEquals(set.value(), hashSet); // check old version is immutable
+        hashSet.remove(forDelete);
+      } else {
+        //adding
+        String forAdd = String.valueOf((int) (10000 * Math.random()));
+        newSet = set.add(forAdd);
+        Assert.assertEquals(set.value(), hashSet); // check old version is immutable
+        hashSet.add(forAdd);
+      }
+      set = newSet;
+      Assert.assertEquals(set.value(), hashSet);
+    }
+  }
+
+  @Test
+  public void abstractEqualsTest(){
+    SetType set = construct(sampleSet);
+    Assert.assertFalse(set.equals(sampleSet));
+    SetType newSet = set.add("25");
+    sampleSet.add("25");
+    Assert.assertFalse(newSet.equals(set));
+    Assert.assertEquals(construct(sampleSet), newSet);
+  }
+
+  @Test
+  public void abstractRemoveMissingTest(){
+    SetType set = construct(sampleSet);
+    set = set.add("25");
+    set = set.remove("25");
+    Assert.assertEquals(set.value(), sampleSet);
+    set = set.remove("25");
+    set = set.add("25");
+    sampleSet.add("25");
+    Assert.assertEquals(set.value(), sampleSet);
+  }
+
+  @Test
+  public void abstractStressMergeTest(){
+    // in one-process context, add, remove and merge operations of lww are equal to operations of Set
+    // we've already checked it. Now just check merge
+    Set<String> hashSet1 = new HashSet<>(), hashSet2 = new HashSet<>();
+    SetType set1 = construct(), set2 = construct();
+
+    for (int it = 0; it < 100; it++){
+      String forAdd = String.valueOf((int) (10000 * Math.random()));
+      if (it % 2 == 0){
+        hashSet1.add(forAdd);
+        set1 = set1.add(forAdd);
+      } else {
+        hashSet2.add(forAdd);
+        set2 = set2.add(forAdd);
+      }
+    }
+    Assert.assertEquals(set1.value(), hashSet1);
+    Assert.assertEquals(set2.value(), hashSet2);
+    Set<String> mergedSet = Stream.concat(hashSet1.stream(), hashSet2.stream()).collect(Collectors.toSet());
+    Assert.assertEquals(set1.merge(set2).value(), mergedSet);
+  }
+
+  @Test
+  public void abstractOptimizeTest(){
+    Assert.assertEquals(construct(sampleSet).value(), sampleSet);
+    Assert.assertEquals(construct(sampleSet).optimize().value(), sampleSet);
+  }
+}
diff --git a/gossip-base/src/test/java/org/apache/gossip/crdt/LWWSetTest.java b/gossip-base/src/test/java/org/apache/gossip/crdt/LWWSetTest.java
deleted file mode 100644
index bdd3258..0000000
--- a/gossip-base/src/test/java/org/apache/gossip/crdt/LWWSetTest.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gossip.crdt;
-
-import org.apache.gossip.manager.Clock;
-import org.apache.gossip.manager.SystemClock;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-public class LWWSetTest {
-  static private Clock clock = new SystemClock();
-  private Set<Integer> sampleSet;
-
-  @Before
-  public void setup(){
-    sampleSet = new HashSet<>();
-    sampleSet.add(4);
-    sampleSet.add(5);
-    sampleSet.add(12);
-  }
-
-  @Test
-  public void setConstructorTest(){
-    Assert.assertEquals(new LWWSet<>(sampleSet).value(), sampleSet);
-  }
-
-  @Test
-  public void stressWithSetTest(){
-    Set<Integer> set = new HashSet<>();
-    LWWSet<Integer> lww = new LWWSet<>();
-    for (int it = 0; it < 100; it++){
-      LWWSet<Integer> newLww;
-      if (it % 5 == 1){
-        //deleting existing
-        Integer forDelete = set.stream().skip((long) (set.size() * Math.random())).findFirst().get();
-        newLww = lww.remove(forDelete);
-        Assert.assertEquals(lww.value(), set); // check old version is immutable
-        set.remove(forDelete);
-      } else {
-        //adding
-        Integer forAdd = (int) (10000 * Math.random());
-        newLww = lww.add(forAdd);
-        Assert.assertEquals(lww.value(), set); // check old version is immutable
-        set.add(forAdd);
-      }
-      lww = newLww;
-      Assert.assertEquals(lww.value(), set);
-    }
-  }
-
-  @Test
-  public void equalsTest(){
-    LWWSet<Integer> lww = new LWWSet<>(sampleSet);
-    Assert.assertFalse(lww.equals(sampleSet));
-    LWWSet<Integer> newLww = lww.add(25);
-    sampleSet.add(25);
-    Assert.assertFalse(newLww.equals(lww));
-    Assert.assertEquals(new LWWSet<>(sampleSet), newLww);
-  }
-
-  @Test
-  public void valueTest() {
-    Map<Character, LWWSet.Timestamps> map = new HashMap<>();
-    map.put('a', new LWWSet.Timestamps(1, 0));
-    map.put('b', new LWWSet.Timestamps(1, 2));
-    map.put('c', new LWWSet.Timestamps(3, 3));
-    Set<Character> toTest = new HashSet<>();
-    toTest.add('a'); // for 'a' addTime > removeTime
-    toTest.add('c'); // for 'c' times are equal, we prefer add to remove
-    Assert.assertEquals(new LWWSet<>(map).value(), toTest);
-    Assert.assertEquals(new LWWSet<>(map), new LWWSet<>('a', 'c'));
-  }
-
-  @Test
-  public void removeMissingTest(){
-    LWWSet<Integer> lww = new LWWSet<>(sampleSet);
-    lww = lww.add(25);
-    lww = lww.remove(25);
-    Assert.assertEquals(lww.value(), sampleSet);
-    lww = lww.remove(25);
-    lww = lww.add(25);
-    sampleSet.add(25);
-    Assert.assertEquals(lww.value(), sampleSet);
-  }
-
-  @Test
-  public void stressMergeTest(){
-    // in one-process context, add, remove and merge operations of lww are equal to operations of Set
-    // we've already checked it. Now just check merge
-    Set<Integer> set1 = new HashSet<>(), set2 = new HashSet<>();
-    LWWSet<Integer> lww1 = new LWWSet<>(), lww2 = new LWWSet<>();
-
-    for (int it = 0; it < 100; it++){
-      Integer forAdd = (int) (10000 * Math.random());
-      if (it % 2 == 0){
-        set1.add(forAdd);
-        lww1 = lww1.add(forAdd);
-      } else {
-        set2.add(forAdd);
-        lww2 = lww2.add(forAdd);
-      }
-    }
-    Assert.assertEquals(lww1.value(), set1);
-    Assert.assertEquals(lww2.value(), set2);
-    Set<Integer> mergedSet = Stream.concat(set1.stream(), set2.stream()).collect(Collectors.toSet());
-    Assert.assertEquals(lww1.merge(lww2).value(), mergedSet);
-  }
-
-  @Test
-  public void fakeTimeMergeTest(){
-    // try to create LWWSet with time from future (simulate other process with its own clock) and validate result
-    // check remove from the future
-    Map<Integer, LWWSet.Timestamps> map = new HashMap<>();
-    map.put(25, new LWWSet.Timestamps(clock.nanoTime(), clock.nanoTime() + 100000));
-    LWWSet<Integer> lww = new LWWSet<>(map);
-    Assert.assertEquals(lww, new LWWSet<Integer>());
-    //create new LWWSet with element 25, and merge with other LWW which has remove in future
-    Assert.assertEquals(new LWWSet<>(25).merge(lww), new LWWSet<Integer>());
-
-    // add in future
-    map.put(25, new LWWSet.Timestamps(clock.nanoTime() + 100000, 0));
-    lww = new LWWSet<>(map);
-    lww = lww.remove(25);
-    Assert.assertEquals(lww, new LWWSet<>(25)); // 25 is still here
-  }
-
-  @Test
-  public void optimizeTest(){
-    Assert.assertEquals(new LWWSet<>(sampleSet).value(), sampleSet);
-    Assert.assertEquals(new LWWSet<>(sampleSet).optimize().value(), sampleSet);
-  }
-}
diff --git a/gossip-base/src/test/java/org/apache/gossip/crdt/LwwSetTest.java b/gossip-base/src/test/java/org/apache/gossip/crdt/LwwSetTest.java
new file mode 100644
index 0000000..8200b15
--- /dev/null
+++ b/gossip-base/src/test/java/org/apache/gossip/crdt/LwwSetTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.crdt;
+
+import org.apache.gossip.manager.Clock;
+import org.apache.gossip.manager.SystemClock;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class LwwSetTest extends AbstractCRDTStringSetTest<LwwSet<String>> {
+  static private Clock clock = new SystemClock();
+
+  LwwSet<String> construct(Set<String> set){
+    return new LwwSet<>(set);
+  }
+
+  LwwSet<String> construct(){
+    return new LwwSet<>();
+  }
+
+  @Test
+  public void valueTest(){
+    Map<Character, LwwSet.Timestamps> map = new HashMap<>();
+    map.put('a', new LwwSet.Timestamps(1, 0));
+    map.put('b', new LwwSet.Timestamps(1, 2));
+    map.put('c', new LwwSet.Timestamps(3, 3));
+    Set<Character> toTest = new HashSet<>();
+    toTest.add('a'); // for 'a' addTime > removeTime
+    toTest.add('c'); // for 'c' times are equal, we prefer add to remove
+    Assert.assertEquals(new LwwSet<>(map).value(), toTest);
+    Assert.assertEquals(new LwwSet<>(map), new LwwSet<>('a', 'c'));
+  }
+
+  @Test
+  public void fakeTimeMergeTest(){
+    // try to create LWWSet with time from future (simulate other process with its own clock) and validate result
+    // check remove from the future
+    Map<Integer, LwwSet.Timestamps> map = new HashMap<>();
+    map.put(25, new LwwSet.Timestamps(clock.nanoTime(), Long.MAX_VALUE));
+    LwwSet<Integer> lww = new LwwSet<>(map);
+    Assert.assertEquals(lww, new LwwSet<Integer>());
+    //create new LWWSet with element 25, and merge with other LWW which has remove in future
+    Assert.assertEquals(new LwwSet<>(25).merge(lww), new LwwSet<Integer>());
+
+    // add in future
+    map.put(25, new LwwSet.Timestamps(Long.MAX_VALUE, 0));
+    lww = new LwwSet<>(map);
+    lww = lww.remove(25);
+    Assert.assertEquals(lww, new LwwSet<>(25)); // 25 is still here
+  }
+}
diff --git a/gossip-base/src/test/java/org/apache/gossip/crdt/MaxChangeSetTest.java b/gossip-base/src/test/java/org/apache/gossip/crdt/MaxChangeSetTest.java
new file mode 100644
index 0000000..2ba3f09
--- /dev/null
+++ b/gossip-base/src/test/java/org/apache/gossip/crdt/MaxChangeSetTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.crdt;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class MaxChangeSetTest extends AbstractCRDTStringSetTest<MaxChangeSet<String>> {
+  MaxChangeSet<String> construct(Set<String> set){
+    return new MaxChangeSet<>(set);
+  }
+
+  MaxChangeSet<String> construct(){
+    return new MaxChangeSet<>();
+  }
+
+  @Test
+  public void valueTest(){
+    Map<Character, Integer> struct = new HashMap<>();
+    struct.put('a', 0);
+    struct.put('b', 1);
+    struct.put('c', 2);
+    struct.put('d', 3);
+    Set<Character> result = new HashSet<>();
+    result.add('b');
+    result.add('d');
+    Assert.assertEquals(new MaxChangeSet<>(struct).value(), result);
+  }
+
+  @Test
+  public void mergeTest(){
+    MaxChangeSet<Integer> set1 = new MaxChangeSet<Integer>().add(1); // Set with one operation on 1
+    MaxChangeSet<Integer> set2 = new MaxChangeSet<Integer>().add(1).remove(1); // two operations
+    Assert.assertEquals(set1.merge(set2), new MaxChangeSet<Integer>()); // empty set wins
+
+    set1 = set1.add(1).add(1).add(1);
+    // empty set still wins, repetitive operations do nothing, don't increase number of operations
+    Assert.assertEquals(set1.merge(set2), new MaxChangeSet<Integer>());
+
+    set1 = set1.remove(1).add(1); // 3 operations
+    Assert.assertEquals(set1.merge(set2), new MaxChangeSet<>(1)); // full set wins now
+
+    set2 = set2.remove(1).remove(1).remove(1);
+    // full set still wins, repetitive removes don't increase number of operations too
+    Assert.assertEquals(set1.merge(set2), new MaxChangeSet<>(1));
+  }
+}
diff --git a/gossip-base/src/test/java/org/apache/gossip/crdt/OrSetTest.java b/gossip-base/src/test/java/org/apache/gossip/crdt/OrSetTest.java
index 70c0d51..bdaada9 100644
--- a/gossip-base/src/test/java/org/apache/gossip/crdt/OrSetTest.java
+++ b/gossip-base/src/test/java/org/apache/gossip/crdt/OrSetTest.java
@@ -17,36 +17,44 @@
  */
 package org.apache.gossip.crdt;
 
-import java.util.Arrays;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
 import org.junit.Assert;
 import org.junit.Test;
 
-public class OrSetTest {
+import java.util.Arrays;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+public class OrSetTest extends AbstractCRDTStringSetTest<OrSet<String>> {
+  OrSet<String> construct(){
+    return new OrSet<>();
+  }
+
+  OrSet<String> construct(Set<String> set){
+    return new OrSet<>(set);
+  }
 
   @Test
-  public void atest() {
+  public void atest(){
     OrSet<Integer> i = new OrSet<>(new OrSet.Builder<Integer>().add(4).add(5).add(6).remove(5));
     Assert.assertArrayEquals(Arrays.asList(4, 6).toArray(), i.value().toArray());
   }
-    
+
   @Test
   public void mergeTest(){
     OrSet<Integer> i = new OrSet<>(new OrSet.Builder<Integer>().add(4).add(5).add(6).remove(5));
     Assert.assertArrayEquals(Arrays.asList(4, 6).toArray(), i.value().toArray());
     OrSet<Integer> j = new OrSet<>(new OrSet.Builder<Integer>().add(9).add(4).add(5).remove(6));
     OrSet<Integer> h = i.merge(j);
-    Assert.assertEquals(new OrSet<Integer>(4,6,9,5), h);
+    Assert.assertEquals(new OrSet<>(4, 6, 9, 5), h);
   }
-  
+
   @Test
   public void mergeTest2(){
     OrSet<Integer> i = new OrSet<>(new OrSet.Builder<Integer>().add(5).add(4).remove(4).add(6));
-    Assert.assertEquals(new OrSet<Integer>(5,6), i);
+    Assert.assertEquals(new OrSet<>(5, 6), i);
     SortedSet<Integer> tree = new TreeSet<>();
-    for (Integer in: i.value()){
+    for (Integer in : i.value()){
       tree.add(in);
     }
     TreeSet<Integer> compare = new TreeSet<>();
@@ -54,34 +62,34 @@
     compare.add(6);
     Assert.assertEquals(tree, compare);
   }
-  
+
   @Test
-  public void mergeTest4() {
-    Assert.assertArrayEquals(new Integer[] {},
-            new OrSet<Integer>(new OrSet.Builder<Integer>().add(1).remove(1)).toArray());
+  public void mergeTest4(){
+    Assert.assertArrayEquals(new Integer[]{},
+        new OrSet<>(new OrSet.Builder<Integer>().add(1).remove(1)).toArray());
   }
-  
+
   @Test
   public void mergeTest3(){
     OrSet<Integer> i = new OrSet<>(1);
     OrSet<Integer> j = new OrSet<>(2);
-    OrSet<Integer> k = new OrSet<>(i.merge(j),  new OrSet.Builder<Integer>().remove(1));
-    Assert.assertArrayEquals(new Integer[] { 2 }, i.merge(j).merge(k).toArray());
-    Assert.assertArrayEquals(new Integer[] { 2 }, j.merge(i).merge(k).toArray());
-    Assert.assertArrayEquals(new Integer[] { 2 }, k.merge(i).merge(j).toArray());
-    Assert.assertArrayEquals(new Integer[] { 2 }, k.merge(j).merge(i).toArray());
-    Assert.assertEquals(j , i.merge(j.merge(k)));
+    OrSet<Integer> k = new OrSet<>(i.merge(j), new OrSet.Builder<Integer>().remove(1));
+    Assert.assertArrayEquals(new Integer[]{2}, i.merge(j).merge(k).toArray());
+    Assert.assertArrayEquals(new Integer[]{2}, j.merge(i).merge(k).toArray());
+    Assert.assertArrayEquals(new Integer[]{2}, k.merge(i).merge(j).toArray());
+    Assert.assertArrayEquals(new Integer[]{2}, k.merge(j).merge(i).toArray());
+    Assert.assertEquals(j, i.merge(j.merge(k)));
   }
-  
+
   @Test
   public void mergeTest9(){
     OrSet<Integer> i = new OrSet<>(19);
     OrSet<Integer> j = i.merge(i);
     Assert.assertEquals(i.value(), j.value());
   }
-  
+
   @Test
-  public void mergeTestSame() {
+  public void mergeTestSame(){
     OrSet<Integer> i = new OrSet<>(19);
     OrSet<Integer> j = new OrSet<>(19);
     OrSet<Integer> k = i.merge(j);
diff --git a/gossip-itest/src/test/java/org/apache/gossip/DataTest.java b/gossip-itest/src/test/java/org/apache/gossip/DataTest.java
index e91426c..df078aa 100644
--- a/gossip-itest/src/test/java/org/apache/gossip/DataTest.java
+++ b/gossip-itest/src/test/java/org/apache/gossip/DataTest.java
@@ -17,69 +17,81 @@
  */
 package org.apache.gossip;
 
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
+import io.teknek.tunit.TUnit;
+import org.apache.gossip.crdt.CrdtAddRemoveSet;
 import org.apache.gossip.crdt.GrowOnlyCounter;
 import org.apache.gossip.crdt.GrowOnlySet;
-import org.apache.gossip.crdt.LWWSet;
+import org.apache.gossip.crdt.LwwSet;
+import org.apache.gossip.crdt.MaxChangeSet;
 import org.apache.gossip.crdt.OrSet;
 import org.apache.gossip.crdt.PNCounter;
 import org.apache.gossip.manager.GossipManager;
 import org.apache.gossip.manager.GossipManagerBuilder;
 import org.apache.gossip.model.PerNodeDataMessage;
 import org.apache.gossip.model.SharedDataMessage;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
-import io.teknek.tunit.TUnit;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Function;
 
-public class DataTest extends AbstractIntegrationBase {
+public class DataTest {
+  private final String gCounterKey = "crdtgc";
+  private final String pnCounterKey = "crdtpn";
 
-  private String orSetKey = "cror";
-  private String lwwSetKey = "crlww";
-  private String gCounterKey = "crdtgc";
-  private String pnCounterKey = "crdtpn";
+  private static final List<GossipManager> clients = new ArrayList<>();
 
-  @Test
-  public void dataTest() throws InterruptedException, UnknownHostException, URISyntaxException {
+  @BeforeClass
+  public static void initializeMembers() throws InterruptedException, UnknownHostException, URISyntaxException{
+    final int clusterMembers = 2;
+
     GossipSettings settings = new GossipSettings();
     settings.setPersistRingState(false);
     settings.setPersistDataState(false);
     String cluster = UUID.randomUUID().toString();
-    int seedNodes = 1;
     List<Member> startupMembers = new ArrayList<>();
-    for (int i = 1; i < seedNodes + 1; ++i) {
-      URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
-      startupMembers.add(new RemoteMember(cluster, uri, i + ""));
+    for (int i = 0; i < clusterMembers; ++i){
+      int id = i + 1;
+      URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + id));
+      startupMembers.add(new RemoteMember(cluster, uri, id + ""));
     }
-    final List<GossipManager> clients = new ArrayList<>();
-    final int clusterMembers = 2;
-    for (int i = 1; i < clusterMembers + 1; ++i) {
-      URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
-      GossipManager gossipService = GossipManagerBuilder
-              .newBuilder()
-              .cluster(cluster).uri(uri)
-              .id(i + "")
-              .gossipMembers(startupMembers)
-              .gossipSettings(settings).build();
+
+    for (Member member : startupMembers){
+      GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(member.getUri())
+          .id(member.getId()).gossipMembers(startupMembers).gossipSettings(settings).build();
       clients.add(gossipService);
       gossipService.init();
-      register(gossipService);
     }
+  }
+
+  @AfterClass
+  public static void shutdownMembers(){
+    for (final GossipManager client : clients){
+      client.shutdown();
+    }
+  }
+
+  @Test
+  public void simpleDataTest(){
     TUnit.assertThat(() -> {
       int total = 0;
-      for (int i = 0; i < clusterMembers; ++i) {
-        total += clients.get(i).getLiveMembers().size();
+      for (GossipManager client : clients){
+        total += client.getLiveMembers().size();
       }
       return total;
-    }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2);
+    }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(2);
+
     clients.get(0).gossipPerNodeData(generatePerNodeMsg("a", "b"));
     clients.get(0).gossipSharedData(generateSharedMsg("a", "c"));
 
@@ -89,7 +101,7 @@
         return "";
       else
         return x.getPayload();
-    }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("b");
+    }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo("b");
 
     TUnit.assertThat(() -> {
       SharedDataMessage x = clients.get(1).findSharedGossipData("a");
@@ -97,175 +109,118 @@
         return "";
       else
         return x.getPayload();
-    }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("c");
-
-    givenDifferentDatumsInSet(clients);
-    assertThatListIsMerged(clients);
-
-    testOrSet(clients);
-    testLWWSet(clients);
-
-    testGrowOnlyCounter(clients);
-    testPNCounter(clients);
-
-    for (int i = 0; i < clusterMembers; ++i) {
-      clients.get(i).shutdown();
-    }
+    }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo("c");
   }
 
-  private void testOrSet(final List<GossipManager> clients) {
-    // populate
-    clients.get(0).merge(generateSharedMsg(orSetKey, new OrSet<>("1", "2")));
-    clients.get(1).merge(generateSharedMsg(orSetKey, new OrSet<>("3", "4")));
+  Set<String> setFromList(String... elements){
+    return new HashSet<>(Arrays.asList(elements));
+  }
 
-    // assert merge
-    assertMerged(clients.get(0), orSetKey, new OrSet<>("1", "2", "3", "4").value());
-    assertMerged(clients.get(1), orSetKey, new OrSet<>("1", "2", "3", "4").value());
+  void crdtSetTest(String key, Function<Set<String>, CrdtAddRemoveSet<String, Set<String>, ?>> construct){
+    //populate
+    clients.get(0).merge(generateSharedMsg(key, construct.apply(setFromList("1", "2"))));
+    clients.get(1).merge(generateSharedMsg(key, construct.apply(setFromList("3", "4"))));
 
-    // drop element
+    assertMergedCrdt(key, construct.apply(setFromList("1", "2", "3", "4")).value());
+
+    //drop element
     @SuppressWarnings("unchecked")
-    OrSet<String> o = (OrSet<String>) clients.get(0).findCrdt(orSetKey);
-    OrSet<String> o2 = new OrSet<>(o, new OrSet.Builder<String>().remove("3"));
-    clients.get(0).merge(generateSharedMsg(orSetKey, o2));
+    CrdtAddRemoveSet<String, ?, ?> set = (CrdtAddRemoveSet<String, ?, ?>) clients.get(0).findCrdt(key);
+    clients.get(0).merge(generateSharedMsg(key, set.remove("3")));
 
-    // assert deletion
-    assertMerged(clients.get(0), orSetKey, new OrSet<>("1", "2", "4").value());
-    assertMerged(clients.get(1), orSetKey, new OrSet<>("1", "2", "4").value());
+    //assert deletion
+    assertMergedCrdt(key, construct.apply(setFromList("1", "2", "4")).value());
   }
 
-  private void testLWWSet(final List<GossipManager> clients) {
-    // populate
-    clients.get(0).merge(generateSharedMsg(lwwSetKey, new LWWSet<>("1", "2")));
-    clients.get(1).merge(generateSharedMsg(lwwSetKey, new LWWSet<>("3", "4")));
-
-    // assert merge
-    assertMerged(clients.get(0), lwwSetKey, new LWWSet<>("1", "2", "3", "4").value());
-    assertMerged(clients.get(1), lwwSetKey, new LWWSet<>("1", "2", "3", "4").value());
-
-    // drop element
-    @SuppressWarnings("unchecked")
-    LWWSet<String> lww = (LWWSet<String>) clients.get(0).findCrdt(lwwSetKey);
-    clients.get(0).merge(generateSharedMsg(lwwSetKey, lww.remove("3")));
-
-    // assert deletion
-    assertMerged(clients.get(0), lwwSetKey, new OrSet<>("1", "2", "4").value());
-    assertMerged(clients.get(1), lwwSetKey, new OrSet<>("1", "2", "4").value());
+  @Test
+  public void OrSetTest(){
+    crdtSetTest("cror", OrSet::new);
   }
 
-  private void testGrowOnlyCounter(List<GossipManager> clients) {
-    givenDifferentIncrement(clients);
-    assertThatCountIsUpdated(clients, 3);
-    givenIncreaseOther(clients);
-    assertThatCountIsUpdated(clients, 7);
+  @Test
+  public void LWWSetTest(){
+    crdtSetTest("crlww", LwwSet::new);
   }
 
-  private void testPNCounter(List<GossipManager> clients) {
-    givenPNCounter(clients);
-    assertThatPNCounterSettlesAt(clients, 0);
-    int[] delta1 = { 2, 3 };
-    givenPNCounterUpdate(clients, delta1);
-    assertThatPNCounterSettlesAt(clients, 5);
-    int[] delta2 = { -3, 5 };
-    givenPNCounterUpdate(clients, delta2);
-    assertThatPNCounterSettlesAt(clients, 7);
-    int[] delta3 = { 1, 1 };
-    givenPNCounterUpdate(clients, delta3);
-    assertThatPNCounterSettlesAt(clients, 9);
-    int[] delta4 = { 1, -7 };
-    givenPNCounterUpdate(clients, delta4);
-    assertThatPNCounterSettlesAt(clients, 3);
+  @Test
+  public void MaxChangeSetTest(){
+    crdtSetTest("crmcs", MaxChangeSet::new);
   }
 
-  private void givenDifferentIncrement(final List<GossipManager> clients) {
+  @Test
+  public void GrowOnlyCounterTest(){
+    Consumer<Long> assertCountUpdated = count -> {
+      for (GossipManager client : clients){
+        TUnit.assertThat(() -> client.findCrdt(gCounterKey))
+            .afterWaitingAtMost(10, TimeUnit.SECONDS)
+            .isEqualTo(new GrowOnlyCounter(new GrowOnlyCounter.Builder(client).increment(count)));
+      }
+    };
+    //generate different increment
     Object payload = new GrowOnlyCounter(new GrowOnlyCounter.Builder(clients.get(0)).increment(1L));
     clients.get(0).merge(generateSharedMsg(gCounterKey, payload));
     payload = new GrowOnlyCounter(new GrowOnlyCounter.Builder(clients.get(1)).increment(2L));
     clients.get(1).merge(generateSharedMsg(gCounterKey, payload));
-  }
 
-  private void givenIncreaseOther(final List<GossipManager> clients) {
+    assertCountUpdated.accept((long) 3);
+
+    //update one
     GrowOnlyCounter gc = (GrowOnlyCounter) clients.get(1).findCrdt(gCounterKey);
     GrowOnlyCounter gc2 = new GrowOnlyCounter(gc,
-            new GrowOnlyCounter.Builder(clients.get(1)).increment(4L));
-
+        new GrowOnlyCounter.Builder(clients.get(1)).increment(4L));
     clients.get(1).merge(generateSharedMsg(gCounterKey, gc2));
+
+    assertCountUpdated.accept((long) 7);
   }
 
-  private void assertMerged(final GossipManager client, String key, final Set<String> expected) {
-    TUnit.assertThat(() -> client.findCrdt(key).value()).afterWaitingAtMost(10, TimeUnit.SECONDS)
-            .isEqualTo(expected);
-  }
+  @Test
+  public void PNCounterTest(){
+    Consumer<List<Integer>> counterUpdate = list -> {
+      int clientIndex = 0;
+      for (int delta : list){
+        PNCounter c = (PNCounter) clients.get(clientIndex).findCrdt(pnCounterKey);
+        c = new PNCounter(c, new PNCounter.Builder(clients.get(clientIndex)).increment(((long) delta)));
+        clients.get(clientIndex).merge(generateSharedMsg(pnCounterKey, c));
+        clientIndex = (clientIndex + 1) % clients.size();
+      }
+    };
 
-  private void givenDifferentDatumsInSet(final List<GossipManager> clients) {
-    clients.get(0).merge(CrdtMessage("1"));
-    clients.get(1).merge(CrdtMessage("2"));
-  }
+    // given PNCounter
+    clients.get(0).merge(generateSharedMsg(pnCounterKey, new PNCounter(new PNCounter.Builder(clients.get(0)))));
+    clients.get(1).merge(generateSharedMsg(pnCounterKey, new PNCounter(new PNCounter.Builder(clients.get(1)))));
 
-  private void assertThatCountIsUpdated(final List<GossipManager> clients, long finalCount) {
-    TUnit.assertThat(() -> clients.get(0).findCrdt(gCounterKey))
-            .afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(new GrowOnlyCounter(
-                    new GrowOnlyCounter.Builder(clients.get(0)).increment(finalCount)));
-  }
+    assertMergedCrdt(pnCounterKey, (long) 0);
 
-  private void assertThatListIsMerged(final List<GossipManager> clients) {
-    TUnit.assertThat(() -> clients.get(0).findCrdt("cr")).afterWaitingAtMost(10, TimeUnit.SECONDS)
-            .isEqualTo(new GrowOnlySet<>(Arrays.asList("1", "2")));
-  }
-  
-  private void givenPNCounter(List<GossipManager> clients) {
-    {
-      SharedDataMessage d = new SharedDataMessage();
-      d.setKey(pnCounterKey);
-      d.setPayload(new PNCounter(new PNCounter.Builder(clients.get(0))));
-      d.setExpireAt(Long.MAX_VALUE);
-      d.setTimestamp(System.currentTimeMillis());
-      clients.get(0).merge(d);
-    }
-    {
-      SharedDataMessage d = new SharedDataMessage();
-      d.setKey(pnCounterKey);
-      d.setPayload(new PNCounter(new PNCounter.Builder(clients.get(1))));
-      d.setExpireAt(Long.MAX_VALUE);
-      d.setTimestamp(System.currentTimeMillis());
-      clients.get(1).merge(d);
+    List<List<Integer>> updateLists = new ArrayList<>();
+    updateLists.add(Arrays.asList(2, 3));
+    updateLists.add(Arrays.asList(-3, 5));
+    updateLists.add(Arrays.asList(1, 1));
+    updateLists.add(Arrays.asList(1, -7));
+
+    Long[] expectedResults = {5L, 7L, 9L, 3L};
+
+    for (int i = 0; i < updateLists.size(); i++){
+      counterUpdate.accept(updateLists.get(i));
+      assertMergedCrdt(pnCounterKey, expectedResults[i]);
     }
   }
 
-  private void givenPNCounterUpdate(List<GossipManager> clients, int[] deltaArray) {
-    int clientIndex = 0;
-    for (int delta: deltaArray) {
-      PNCounter c = (PNCounter) clients.get(clientIndex).findCrdt(pnCounterKey);
-      c = new PNCounter(c, new PNCounter.Builder(clients.get(clientIndex)).increment(((long)delta)));
-      SharedDataMessage d = new SharedDataMessage();
-      d.setKey(pnCounterKey);
-      d.setPayload(c);
-      d.setExpireAt(Long.MAX_VALUE);
-      d.setTimestamp(System.currentTimeMillis());
-      clients.get(clientIndex).merge(d);
-      clientIndex = (clientIndex + 1) % clients.size();
+  @Test
+  public void GrowOnlySetTest(){
+    clients.get(0).merge(generateSharedMsg("cr", new GrowOnlySet<>(Arrays.asList("1"))));
+    clients.get(1).merge(generateSharedMsg("cr", new GrowOnlySet<>(Arrays.asList("2"))));
+
+    assertMergedCrdt("cr", new GrowOnlySet<>(Arrays.asList("1", "2")).value());
+  }
+
+  private void assertMergedCrdt(String key, Object expected){
+    for (GossipManager client : clients){
+      TUnit.assertThat(() -> client.findCrdt(key).value())
+          .afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(expected);
     }
   }
 
-  private void assertThatPNCounterSettlesAt(List<GossipManager> clients, long expectedValue) {
-    for (GossipManager client: clients) {
-      TUnit.assertThat(() -> {
-        long value = 0;
-        Object o = client.findCrdt(pnCounterKey);
-        if (o != null) {
-          PNCounter c = (PNCounter)o;
-          value = c.value();
-        }
-        return value;
-      }).afterWaitingAtMost(10, TimeUnit.SECONDS)
-              .isEqualTo(expectedValue);
-    }
-  }
-
-  private SharedDataMessage CrdtMessage(String item) {
-    return generateSharedMsg("cr", new GrowOnlySet<>(Arrays.asList(item)));
-  }
-
-  private PerNodeDataMessage generatePerNodeMsg(String key, Object payload) {
+  private PerNodeDataMessage generatePerNodeMsg(String key, Object payload){
     PerNodeDataMessage g = new PerNodeDataMessage();
     g.setExpireAt(Long.MAX_VALUE);
     g.setKey(key);
@@ -274,7 +229,7 @@
     return g;
   }
 
-  private SharedDataMessage generateSharedMsg(String key, Object payload) {
+  private SharedDataMessage generateSharedMsg(String key, Object payload){
     SharedDataMessage d = new SharedDataMessage();
     d.setKey(key);
     d.setPayload(payload);
diff --git a/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/JacksonTest.java b/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/JacksonTest.java
index 3c90ea1..d391fa1 100644
--- a/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/JacksonTest.java
+++ b/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/JacksonTest.java
@@ -22,7 +22,8 @@
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.gossip.GossipSettings;
 import org.apache.gossip.Member;
-import org.apache.gossip.crdt.LWWSet;
+import org.apache.gossip.crdt.LwwSet;
+import org.apache.gossip.crdt.MaxChangeSet;
 import org.apache.gossip.crdt.OrSet;
 import org.apache.gossip.manager.GossipManager;
 import org.apache.gossip.manager.GossipManagerBuilder;
@@ -39,7 +40,7 @@
 import java.util.UUID;
 
 public class JacksonTest {
-  
+
   private static GossipSettings simpleSettings(GossipSettings settings) {
     settings.setPersistRingState(false);
     settings.setPersistDataState(false);
@@ -47,12 +48,12 @@
     settings.setProtocolManagerClass("org.apache.gossip.protocol.json.JacksonProtocolManager");
     return settings;
   }
-  
+
   private static GossipSettings withSigning(GossipSettings settings) {
     settings.setSignMessages(true);
     return settings;
   }
-  
+
   // formerly of SignedMessageTest.
   @Test(expected = IllegalArgumentException.class)
   public void ifSignMustHaveKeys()
@@ -70,11 +71,11 @@
             .build();
     gossipService.init();
   }
-  
+
   @Test
   public void jacksonSerialTest() throws InterruptedException, URISyntaxException, IOException {
     ObjectMapper objectMapper = JacksonProtocolManager.buildObjectMapper(simpleSettings(new GossipSettings()));
-    
+
     OrSet<Integer> i = new OrSet<Integer>(new OrSet.Builder<Integer>().add(1).remove(1));
     String s = objectMapper.writeValueAsString(i);
     @SuppressWarnings("unchecked")
@@ -82,38 +83,50 @@
     Assert.assertEquals(back, i);
   }
 
-  @Test
-  public void jacksonCrdtLWWSetTest() {
+  void jacksonCrdtSeDeTest(Object value, Class<?> cl){
     ObjectMapper objectMapper = JacksonProtocolManager.buildObjectMapper(simpleSettings(new GossipSettings()));
 
-    LWWSet<String> lww = new LWWSet<>("a", "b", "c");
-
     try {
-      String lwwS = objectMapper.writeValueAsString(lww);
+      String valueS = objectMapper.writeValueAsString(value);
       @SuppressWarnings("unchecked")
-      LWWSet<String> parsedLww = objectMapper.readValue(lwwS, LWWSet.class);
-      Assert.assertEquals(lww, parsedLww);
+      Object parsedValue = objectMapper.readValue(valueS, cl);
+      Assert.assertEquals(value, parsedValue);
     } catch (Exception e) {
-      Assert.fail("LWWSet se/de error");
+      Assert.fail("Jackson se/de error");
     }
   }
-  
+
+  @Test
+  public void jacksonOrSetTest(){
+    jacksonCrdtSeDeTest(new OrSet<>("1", "2", "3"), OrSet.class);
+  }
+
+  @Test
+  public void jacksonLWWSetTest(){
+    jacksonCrdtSeDeTest(new LwwSet<>("1", "2", "3"), LwwSet.class);
+  }
+
+  @Test
+  public void jacksonMaxChangeSetTest(){
+    jacksonCrdtSeDeTest(new MaxChangeSet<>("1", "2", "3"), MaxChangeSet.class);
+  }
+
   @Test
   public void testMessageEqualityAssumptions() {
     long timeA = System.nanoTime();
     long timeB = System.nanoTime();
     Assert.assertNotEquals(timeA, timeB);
-    
+
     TestMessage messageA0 = new TestMessage(Long.toHexString(timeA));
     TestMessage messageA1 = new TestMessage(Long.toHexString(timeA));
     TestMessage messageB = new TestMessage(Long.toHexString(timeB));
-    
+
     Assert.assertEquals(messageA0, messageA1);
     Assert.assertFalse(messageA0 == messageA1);
     Assert.assertNotEquals(messageA0, messageB);
     Assert.assertNotEquals(messageA1, messageB);
   }
-  
+
   // ideally, we would test the serializability of every message type, but we just want to make sure this works in
   // basic cases.
   @Test