GOSSIP-66 Implement Crdt 2P-Set
diff --git a/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java b/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java
index 7ec96e7..ab5cefa 100644
--- a/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java
+++ b/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java
@@ -54,6 +54,13 @@
   @JsonProperty("data") abstract Map<E, Integer> getStruct();
 }
 
+abstract class TwoPhaseSetMixin<E> {
+  @JsonCreator
+  TwoPhaseSetMixin(@JsonProperty("added") Set<E> added, @JsonProperty("removed") Set<E> removed) { }
+  @JsonProperty("added") abstract Set<E> getAdded();
+  @JsonProperty("removed") abstract Set<E> getRemoved();
+}
+
 abstract class GrowOnlySetMixin<E>{
   @JsonCreator
   GrowOnlySetMixin(@JsonProperty("elements") Set<E> elements){ }
@@ -93,6 +100,7 @@
     context.setMixInAnnotations(LwwSet.class, LWWSetMixin.class);
     context.setMixInAnnotations(LwwSet.Timestamps.class, LWWSetTimestampsMixin.class);
     context.setMixInAnnotations(MaxChangeSet.class, MaxChangeSetMixin.class);
+    context.setMixInAnnotations(TwoPhaseSet.class, TwoPhaseSetMixin.class);
   }
 
 }
diff --git a/gossip-base/src/main/java/org/apache/gossip/crdt/TwoPhaseSet.java b/gossip-base/src/main/java/org/apache/gossip/crdt/TwoPhaseSet.java
new file mode 100644
index 0000000..a1f44a9
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/crdt/TwoPhaseSet.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.crdt;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/*
+  Two-Phase CrdtSet.
+  You can add element only once and remove only once.
+  You cannot remove element which is not present.
+
+  Read more: https://github.com/aphyr/meangirls#2p-set
+  You can view examples of usage in tests:
+  TwoPhaseSetTest - unit tests
+  DataTest - integration test with 2 nodes, TwoPhaseSet was serialized/deserialized, sent between nodes, merged
+*/
+
+public class TwoPhaseSet<ElementType> implements CrdtAddRemoveSet<ElementType, Set<ElementType>, TwoPhaseSet<ElementType>> {
+  private final Set<ElementType> added;
+  private final Set<ElementType> removed;
+
+  public TwoPhaseSet(){
+    added = new HashSet<>();
+    removed = new HashSet<>();
+  }
+
+  @SafeVarargs
+  public TwoPhaseSet(ElementType... elements){
+    this(new HashSet<>(Arrays.asList(elements)));
+  }
+
+  public TwoPhaseSet(Set<ElementType> set){
+    this();
+    for (ElementType e : set){
+      added.add(e);
+    }
+  }
+
+  public TwoPhaseSet(TwoPhaseSet<ElementType> first, TwoPhaseSet<ElementType> second){
+    BiFunction<Set<ElementType>, Set<ElementType>, Set<ElementType>> mergeSets = (f, s) ->
+        Stream.concat(f.stream(), s.stream()).collect(Collectors.toSet());
+
+    added = mergeSets.apply(first.added, second.added);
+    removed = mergeSets.apply(first.removed, second.removed);
+  }
+
+  TwoPhaseSet(Set<ElementType> added, Set<ElementType> removed){
+    this.added = added;
+    this.removed = removed;
+  }
+
+  Set<ElementType> getAdded(){
+    return added;
+  }
+
+  Set<ElementType> getRemoved(){
+    return removed;
+  }
+
+  public TwoPhaseSet<ElementType> add(ElementType e){
+    if (removed.contains(e) || added.contains(e)){
+      return this;
+    }
+    return this.merge(new TwoPhaseSet<>(e));
+  }
+
+  public TwoPhaseSet<ElementType> remove(ElementType e){
+    if (removed.contains(e) || !added.contains(e)){
+      return this;
+    }
+    Set<ElementType> eSet = new HashSet<>(Collections.singletonList(e));
+    return this.merge(new TwoPhaseSet<>(eSet, eSet));
+  }
+
+  @Override
+  public TwoPhaseSet<ElementType> merge(TwoPhaseSet<ElementType> other){
+    return new TwoPhaseSet<>(this, other);
+  }
+
+  @Override
+  public Set<ElementType> value(){
+    return added.stream().filter(e -> !removed.contains(e)).collect(Collectors.toSet());
+  }
+
+  @Override
+  public TwoPhaseSet<ElementType> optimize(){
+    return new TwoPhaseSet<>(value(), removed);
+  }
+
+  @Override
+  public boolean equals(Object obj){
+    return this == obj || (obj != null && getClass() == obj.getClass() && value().equals(((TwoPhaseSet) obj).value()));
+  }
+}
\ No newline at end of file
diff --git a/gossip-base/src/test/java/org/apache/gossip/crdt/AbstractCRDTStringSetTest.java b/gossip-base/src/test/java/org/apache/gossip/crdt/AddRemoveStringSetTest.java
similarity index 94%
rename from gossip-base/src/test/java/org/apache/gossip/crdt/AbstractCRDTStringSetTest.java
rename to gossip-base/src/test/java/org/apache/gossip/crdt/AddRemoveStringSetTest.java
index d4db4ce..6dac9df 100644
--- a/gossip-base/src/test/java/org/apache/gossip/crdt/AbstractCRDTStringSetTest.java
+++ b/gossip-base/src/test/java/org/apache/gossip/crdt/AddRemoveStringSetTest.java
@@ -19,8 +19,9 @@
 
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Test;
 import org.junit.Ignore;
+import org.junit.Test;
+
 import java.util.HashSet;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -28,6 +29,8 @@
 
 /*
   Abstract test suit to test CrdtSets with Add and Remove operations.
+  You can use this suite only if your set supports multiple additions/deletions
+    and has behavior similar to Set in single-threaded environment.
   It compares them with simple sets, validates add, remove, equals, value, etc. operations
   To use it you should:
   1. subclass this and implement constructors
@@ -36,7 +39,8 @@
 */
 
 @Ignore
-public abstract class AbstractCRDTStringSetTest<SetType extends CrdtAddRemoveSet<String, Set<String>, SetType>> {
+public abstract class AddRemoveStringSetTest<SetType extends CrdtAddRemoveSet<String, Set<String>, SetType>> {
+
   abstract SetType construct(Set<String> set);
 
   abstract SetType construct();
diff --git a/gossip-base/src/test/java/org/apache/gossip/crdt/LwwSetTest.java b/gossip-base/src/test/java/org/apache/gossip/crdt/LwwSetTest.java
index 8200b15..c4da83d 100644
--- a/gossip-base/src/test/java/org/apache/gossip/crdt/LwwSetTest.java
+++ b/gossip-base/src/test/java/org/apache/gossip/crdt/LwwSetTest.java
@@ -27,7 +27,7 @@
 import java.util.Map;
 import java.util.Set;
 
-public class LwwSetTest extends AbstractCRDTStringSetTest<LwwSet<String>> {
+public class LwwSetTest extends AddRemoveStringSetTest<LwwSet<String>> {
   static private Clock clock = new SystemClock();
 
   LwwSet<String> construct(Set<String> set){
diff --git a/gossip-base/src/test/java/org/apache/gossip/crdt/MaxChangeSetTest.java b/gossip-base/src/test/java/org/apache/gossip/crdt/MaxChangeSetTest.java
index 2ba3f09..3828747 100644
--- a/gossip-base/src/test/java/org/apache/gossip/crdt/MaxChangeSetTest.java
+++ b/gossip-base/src/test/java/org/apache/gossip/crdt/MaxChangeSetTest.java
@@ -25,7 +25,7 @@
 import java.util.Map;
 import java.util.Set;
 
-public class MaxChangeSetTest extends AbstractCRDTStringSetTest<MaxChangeSet<String>> {
+public class MaxChangeSetTest extends AddRemoveStringSetTest<MaxChangeSet<String>> {
   MaxChangeSet<String> construct(Set<String> set){
     return new MaxChangeSet<>(set);
   }
diff --git a/gossip-base/src/test/java/org/apache/gossip/crdt/OrSetTest.java b/gossip-base/src/test/java/org/apache/gossip/crdt/OrSetTest.java
index bdaada9..8b21360 100644
--- a/gossip-base/src/test/java/org/apache/gossip/crdt/OrSetTest.java
+++ b/gossip-base/src/test/java/org/apache/gossip/crdt/OrSetTest.java
@@ -25,7 +25,7 @@
 import java.util.SortedSet;
 import java.util.TreeSet;
 
-public class OrSetTest extends AbstractCRDTStringSetTest<OrSet<String>> {
+public class OrSetTest extends AddRemoveStringSetTest<OrSet<String>> {
   OrSet<String> construct(){
     return new OrSet<>();
   }
diff --git a/gossip-base/src/test/java/org/apache/gossip/crdt/TwoPhaseSetTest.java b/gossip-base/src/test/java/org/apache/gossip/crdt/TwoPhaseSetTest.java
new file mode 100644
index 0000000..3af1920
--- /dev/null
+++ b/gossip-base/src/test/java/org/apache/gossip/crdt/TwoPhaseSetTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.crdt;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.BiConsumer;
+
+public class TwoPhaseSetTest {
+
+  private Set<String> sampleSet;
+
+  @Before
+  public void setup(){
+    sampleSet = new HashSet<>();
+    sampleSet.add("a");
+    sampleSet.add("b");
+    sampleSet.add("d");
+  }
+
+  @Test
+  public void setConstructorTest(){
+    Assert.assertEquals(new TwoPhaseSet<>(sampleSet).value(), sampleSet);
+  }
+
+  @Test
+  public void valueTest(){
+    Set<Character> added = new HashSet<>();
+    added.add('a');
+    added.add('b');
+    Set<Character> removed = new HashSet<>();
+    removed.add('b');
+    Assert.assertEquals(new TwoPhaseSet<>(added, removed), new TwoPhaseSet<>('a'));
+  }
+
+  @Test
+  public void optimizeTest(){
+    TwoPhaseSet<String> set = new TwoPhaseSet<>(sampleSet);
+    set = set.remove("b");
+    Assert.assertEquals(set.optimize(), set);
+    // check that optimize in this case actually works
+    Assert.assertTrue(set.optimize().getAdded().size() < set.getAdded().size());
+  }
+
+  @Test
+  public void immutabilityTest(){
+    TwoPhaseSet<String> set = new TwoPhaseSet<>(sampleSet);
+    TwoPhaseSet<String> newSet = set.remove("b");
+    Assert.assertNotEquals(set, newSet);
+    Assert.assertEquals(set, new TwoPhaseSet<>(sampleSet));
+  }
+
+  @Test
+  public void removeMissingAddExistingLimitsTest(){
+    BiConsumer<TwoPhaseSet<?>, TwoPhaseSet<?>> checkInternals = (f, s) -> {
+      Assert.assertEquals(s, f);
+      Assert.assertEquals(s.getRemoved(), f.getRemoved());
+      Assert.assertEquals(s.getAdded(), f.getAdded());
+    };
+    TwoPhaseSet<String> set = new TwoPhaseSet<>(sampleSet);
+    // remove missing
+    checkInternals.accept(set, set.remove("e"));
+    // add existing
+    checkInternals.accept(set, set.add("a"));
+    // limits
+    TwoPhaseSet<String> newSet = set.remove("a"); // allow this remove
+    Assert.assertEquals(newSet.add("a"), new TwoPhaseSet<>("b", "d")); // discard this add, "a" was added and removed
+  }
+
+  @Test
+  public void mergeTest(){
+    TwoPhaseSet<String> f = new TwoPhaseSet<>(sampleSet);
+    TwoPhaseSet<String> s = new TwoPhaseSet<>("a", "c");
+    s = s.remove("a");
+    TwoPhaseSet<String> res = f.merge(s);
+    Assert.assertEquals(res, new TwoPhaseSet<>(f, s)); // check two-sets constructor
+
+    // "a" was both added and deleted in second set => it's deleted in result
+    // "b" and "d" comes from first set and "c" comes from second
+    Assert.assertEquals(res, new TwoPhaseSet<>("b", "c", "d"));
+  }
+}
diff --git a/gossip-itest/src/test/java/org/apache/gossip/DataTest.java b/gossip-itest/src/test/java/org/apache/gossip/DataTest.java
index df078aa..c16174f 100644
--- a/gossip-itest/src/test/java/org/apache/gossip/DataTest.java
+++ b/gossip-itest/src/test/java/org/apache/gossip/DataTest.java
@@ -25,6 +25,7 @@
 import org.apache.gossip.crdt.MaxChangeSet;
 import org.apache.gossip.crdt.OrSet;
 import org.apache.gossip.crdt.PNCounter;
+import org.apache.gossip.crdt.TwoPhaseSet;
 import org.apache.gossip.manager.GossipManager;
 import org.apache.gossip.manager.GossipManagerBuilder;
 import org.apache.gossip.model.PerNodeDataMessage;
@@ -148,6 +149,11 @@
   }
 
   @Test
+  public void TwoPhaseSetTest(){
+    crdtSetTest("crtps", TwoPhaseSet::new);
+  }
+
+  @Test
   public void GrowOnlyCounterTest(){
     Consumer<Long> assertCountUpdated = count -> {
       for (GossipManager client : clients){
diff --git a/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/JacksonTest.java b/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/JacksonTest.java
index d391fa1..2a5239c 100644
--- a/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/JacksonTest.java
+++ b/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/JacksonTest.java
@@ -25,6 +25,7 @@
 import org.apache.gossip.crdt.LwwSet;
 import org.apache.gossip.crdt.MaxChangeSet;
 import org.apache.gossip.crdt.OrSet;
+import org.apache.gossip.crdt.TwoPhaseSet;
 import org.apache.gossip.manager.GossipManager;
 import org.apache.gossip.manager.GossipManagerBuilder;
 import org.apache.gossip.protocol.ProtocolManager;
@@ -98,17 +99,22 @@
 
   @Test
   public void jacksonOrSetTest(){
-    jacksonCrdtSeDeTest(new OrSet<>("1", "2", "3"), OrSet.class);
+    jacksonCrdtSeDeTest(new OrSet<>("1", "2", "3").remove("2"), OrSet.class);
   }
 
   @Test
   public void jacksonLWWSetTest(){
-    jacksonCrdtSeDeTest(new LwwSet<>("1", "2", "3"), LwwSet.class);
+    jacksonCrdtSeDeTest(new LwwSet<>("1", "2", "3").remove("2"), LwwSet.class);
   }
 
   @Test
   public void jacksonMaxChangeSetTest(){
-    jacksonCrdtSeDeTest(new MaxChangeSet<>("1", "2", "3"), MaxChangeSet.class);
+    jacksonCrdtSeDeTest(new MaxChangeSet<>("1", "2", "3").remove("2"), MaxChangeSet.class);
+  }
+
+  @Test
+  public void jacksonTwoPhaseSetTest(){
+    jacksonCrdtSeDeTest(new TwoPhaseSet<>("1", "2", "3").remove("2"), TwoPhaseSet.class);
   }
 
   @Test