GOSSIP-66 Implement Crdt 2P-Set
diff --git a/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java b/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java
index 7ec96e7..ab5cefa 100644
--- a/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java
+++ b/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java
@@ -54,6 +54,13 @@
@JsonProperty("data") abstract Map<E, Integer> getStruct();
}
+abstract class TwoPhaseSetMixin<E> {
+ @JsonCreator
+ TwoPhaseSetMixin(@JsonProperty("added") Set<E> added, @JsonProperty("removed") Set<E> removed) { }
+ @JsonProperty("added") abstract Set<E> getAdded();
+ @JsonProperty("removed") abstract Set<E> getRemoved();
+}
+
abstract class GrowOnlySetMixin<E>{
@JsonCreator
GrowOnlySetMixin(@JsonProperty("elements") Set<E> elements){ }
@@ -93,6 +100,7 @@
context.setMixInAnnotations(LwwSet.class, LWWSetMixin.class);
context.setMixInAnnotations(LwwSet.Timestamps.class, LWWSetTimestampsMixin.class);
context.setMixInAnnotations(MaxChangeSet.class, MaxChangeSetMixin.class);
+ context.setMixInAnnotations(TwoPhaseSet.class, TwoPhaseSetMixin.class);
}
}
diff --git a/gossip-base/src/main/java/org/apache/gossip/crdt/TwoPhaseSet.java b/gossip-base/src/main/java/org/apache/gossip/crdt/TwoPhaseSet.java
new file mode 100644
index 0000000..a1f44a9
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/crdt/TwoPhaseSet.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.crdt;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/*
+ Two-Phase CrdtSet.
+ You can add element only once and remove only once.
+ You cannot remove element which is not present.
+
+ Read more: https://github.com/aphyr/meangirls#2p-set
+ You can view examples of usage in tests:
+ TwoPhaseSetTest - unit tests
+ DataTest - integration test with 2 nodes, TwoPhaseSet was serialized/deserialized, sent between nodes, merged
+*/
+
+public class TwoPhaseSet<ElementType> implements CrdtAddRemoveSet<ElementType, Set<ElementType>, TwoPhaseSet<ElementType>> {
+ private final Set<ElementType> added;
+ private final Set<ElementType> removed;
+
+ public TwoPhaseSet(){
+ added = new HashSet<>();
+ removed = new HashSet<>();
+ }
+
+ @SafeVarargs
+ public TwoPhaseSet(ElementType... elements){
+ this(new HashSet<>(Arrays.asList(elements)));
+ }
+
+ public TwoPhaseSet(Set<ElementType> set){
+ this();
+ for (ElementType e : set){
+ added.add(e);
+ }
+ }
+
+ public TwoPhaseSet(TwoPhaseSet<ElementType> first, TwoPhaseSet<ElementType> second){
+ BiFunction<Set<ElementType>, Set<ElementType>, Set<ElementType>> mergeSets = (f, s) ->
+ Stream.concat(f.stream(), s.stream()).collect(Collectors.toSet());
+
+ added = mergeSets.apply(first.added, second.added);
+ removed = mergeSets.apply(first.removed, second.removed);
+ }
+
+ TwoPhaseSet(Set<ElementType> added, Set<ElementType> removed){
+ this.added = added;
+ this.removed = removed;
+ }
+
+ Set<ElementType> getAdded(){
+ return added;
+ }
+
+ Set<ElementType> getRemoved(){
+ return removed;
+ }
+
+ public TwoPhaseSet<ElementType> add(ElementType e){
+ if (removed.contains(e) || added.contains(e)){
+ return this;
+ }
+ return this.merge(new TwoPhaseSet<>(e));
+ }
+
+ public TwoPhaseSet<ElementType> remove(ElementType e){
+ if (removed.contains(e) || !added.contains(e)){
+ return this;
+ }
+ Set<ElementType> eSet = new HashSet<>(Collections.singletonList(e));
+ return this.merge(new TwoPhaseSet<>(eSet, eSet));
+ }
+
+ @Override
+ public TwoPhaseSet<ElementType> merge(TwoPhaseSet<ElementType> other){
+ return new TwoPhaseSet<>(this, other);
+ }
+
+ @Override
+ public Set<ElementType> value(){
+ return added.stream().filter(e -> !removed.contains(e)).collect(Collectors.toSet());
+ }
+
+ @Override
+ public TwoPhaseSet<ElementType> optimize(){
+ return new TwoPhaseSet<>(value(), removed);
+ }
+
+ @Override
+ public boolean equals(Object obj){
+ return this == obj || (obj != null && getClass() == obj.getClass() && value().equals(((TwoPhaseSet) obj).value()));
+ }
+}
\ No newline at end of file
diff --git a/gossip-base/src/test/java/org/apache/gossip/crdt/AbstractCRDTStringSetTest.java b/gossip-base/src/test/java/org/apache/gossip/crdt/AddRemoveStringSetTest.java
similarity index 94%
rename from gossip-base/src/test/java/org/apache/gossip/crdt/AbstractCRDTStringSetTest.java
rename to gossip-base/src/test/java/org/apache/gossip/crdt/AddRemoveStringSetTest.java
index d4db4ce..6dac9df 100644
--- a/gossip-base/src/test/java/org/apache/gossip/crdt/AbstractCRDTStringSetTest.java
+++ b/gossip-base/src/test/java/org/apache/gossip/crdt/AddRemoveStringSetTest.java
@@ -19,8 +19,9 @@
import org.junit.Assert;
import org.junit.Before;
-import org.junit.Test;
import org.junit.Ignore;
+import org.junit.Test;
+
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
@@ -28,6 +29,8 @@
/*
Abstract test suit to test CrdtSets with Add and Remove operations.
+ You can use this suite only if your set supports multiple additions/deletions
+ and has behavior similar to Set in single-threaded environment.
It compares them with simple sets, validates add, remove, equals, value, etc. operations
To use it you should:
1. subclass this and implement constructors
@@ -36,7 +39,8 @@
*/
@Ignore
-public abstract class AbstractCRDTStringSetTest<SetType extends CrdtAddRemoveSet<String, Set<String>, SetType>> {
+public abstract class AddRemoveStringSetTest<SetType extends CrdtAddRemoveSet<String, Set<String>, SetType>> {
+
abstract SetType construct(Set<String> set);
abstract SetType construct();
diff --git a/gossip-base/src/test/java/org/apache/gossip/crdt/LwwSetTest.java b/gossip-base/src/test/java/org/apache/gossip/crdt/LwwSetTest.java
index 8200b15..c4da83d 100644
--- a/gossip-base/src/test/java/org/apache/gossip/crdt/LwwSetTest.java
+++ b/gossip-base/src/test/java/org/apache/gossip/crdt/LwwSetTest.java
@@ -27,7 +27,7 @@
import java.util.Map;
import java.util.Set;
-public class LwwSetTest extends AbstractCRDTStringSetTest<LwwSet<String>> {
+public class LwwSetTest extends AddRemoveStringSetTest<LwwSet<String>> {
static private Clock clock = new SystemClock();
LwwSet<String> construct(Set<String> set){
diff --git a/gossip-base/src/test/java/org/apache/gossip/crdt/MaxChangeSetTest.java b/gossip-base/src/test/java/org/apache/gossip/crdt/MaxChangeSetTest.java
index 2ba3f09..3828747 100644
--- a/gossip-base/src/test/java/org/apache/gossip/crdt/MaxChangeSetTest.java
+++ b/gossip-base/src/test/java/org/apache/gossip/crdt/MaxChangeSetTest.java
@@ -25,7 +25,7 @@
import java.util.Map;
import java.util.Set;
-public class MaxChangeSetTest extends AbstractCRDTStringSetTest<MaxChangeSet<String>> {
+public class MaxChangeSetTest extends AddRemoveStringSetTest<MaxChangeSet<String>> {
MaxChangeSet<String> construct(Set<String> set){
return new MaxChangeSet<>(set);
}
diff --git a/gossip-base/src/test/java/org/apache/gossip/crdt/OrSetTest.java b/gossip-base/src/test/java/org/apache/gossip/crdt/OrSetTest.java
index bdaada9..8b21360 100644
--- a/gossip-base/src/test/java/org/apache/gossip/crdt/OrSetTest.java
+++ b/gossip-base/src/test/java/org/apache/gossip/crdt/OrSetTest.java
@@ -25,7 +25,7 @@
import java.util.SortedSet;
import java.util.TreeSet;
-public class OrSetTest extends AbstractCRDTStringSetTest<OrSet<String>> {
+public class OrSetTest extends AddRemoveStringSetTest<OrSet<String>> {
OrSet<String> construct(){
return new OrSet<>();
}
diff --git a/gossip-base/src/test/java/org/apache/gossip/crdt/TwoPhaseSetTest.java b/gossip-base/src/test/java/org/apache/gossip/crdt/TwoPhaseSetTest.java
new file mode 100644
index 0000000..3af1920
--- /dev/null
+++ b/gossip-base/src/test/java/org/apache/gossip/crdt/TwoPhaseSetTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.crdt;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.BiConsumer;
+
+public class TwoPhaseSetTest {
+
+ private Set<String> sampleSet;
+
+ @Before
+ public void setup(){
+ sampleSet = new HashSet<>();
+ sampleSet.add("a");
+ sampleSet.add("b");
+ sampleSet.add("d");
+ }
+
+ @Test
+ public void setConstructorTest(){
+ Assert.assertEquals(new TwoPhaseSet<>(sampleSet).value(), sampleSet);
+ }
+
+ @Test
+ public void valueTest(){
+ Set<Character> added = new HashSet<>();
+ added.add('a');
+ added.add('b');
+ Set<Character> removed = new HashSet<>();
+ removed.add('b');
+ Assert.assertEquals(new TwoPhaseSet<>(added, removed), new TwoPhaseSet<>('a'));
+ }
+
+ @Test
+ public void optimizeTest(){
+ TwoPhaseSet<String> set = new TwoPhaseSet<>(sampleSet);
+ set = set.remove("b");
+ Assert.assertEquals(set.optimize(), set);
+ // check that optimize in this case actually works
+ Assert.assertTrue(set.optimize().getAdded().size() < set.getAdded().size());
+ }
+
+ @Test
+ public void immutabilityTest(){
+ TwoPhaseSet<String> set = new TwoPhaseSet<>(sampleSet);
+ TwoPhaseSet<String> newSet = set.remove("b");
+ Assert.assertNotEquals(set, newSet);
+ Assert.assertEquals(set, new TwoPhaseSet<>(sampleSet));
+ }
+
+ @Test
+ public void removeMissingAddExistingLimitsTest(){
+ BiConsumer<TwoPhaseSet<?>, TwoPhaseSet<?>> checkInternals = (f, s) -> {
+ Assert.assertEquals(s, f);
+ Assert.assertEquals(s.getRemoved(), f.getRemoved());
+ Assert.assertEquals(s.getAdded(), f.getAdded());
+ };
+ TwoPhaseSet<String> set = new TwoPhaseSet<>(sampleSet);
+ // remove missing
+ checkInternals.accept(set, set.remove("e"));
+ // add existing
+ checkInternals.accept(set, set.add("a"));
+ // limits
+ TwoPhaseSet<String> newSet = set.remove("a"); // allow this remove
+ Assert.assertEquals(newSet.add("a"), new TwoPhaseSet<>("b", "d")); // discard this add, "a" was added and removed
+ }
+
+ @Test
+ public void mergeTest(){
+ TwoPhaseSet<String> f = new TwoPhaseSet<>(sampleSet);
+ TwoPhaseSet<String> s = new TwoPhaseSet<>("a", "c");
+ s = s.remove("a");
+ TwoPhaseSet<String> res = f.merge(s);
+ Assert.assertEquals(res, new TwoPhaseSet<>(f, s)); // check two-sets constructor
+
+ // "a" was both added and deleted in second set => it's deleted in result
+ // "b" and "d" comes from first set and "c" comes from second
+ Assert.assertEquals(res, new TwoPhaseSet<>("b", "c", "d"));
+ }
+}
diff --git a/gossip-itest/src/test/java/org/apache/gossip/DataTest.java b/gossip-itest/src/test/java/org/apache/gossip/DataTest.java
index df078aa..c16174f 100644
--- a/gossip-itest/src/test/java/org/apache/gossip/DataTest.java
+++ b/gossip-itest/src/test/java/org/apache/gossip/DataTest.java
@@ -25,6 +25,7 @@
import org.apache.gossip.crdt.MaxChangeSet;
import org.apache.gossip.crdt.OrSet;
import org.apache.gossip.crdt.PNCounter;
+import org.apache.gossip.crdt.TwoPhaseSet;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.manager.GossipManagerBuilder;
import org.apache.gossip.model.PerNodeDataMessage;
@@ -148,6 +149,11 @@
}
@Test
+ public void TwoPhaseSetTest(){
+ crdtSetTest("crtps", TwoPhaseSet::new);
+ }
+
+ @Test
public void GrowOnlyCounterTest(){
Consumer<Long> assertCountUpdated = count -> {
for (GossipManager client : clients){
diff --git a/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/JacksonTest.java b/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/JacksonTest.java
index d391fa1..2a5239c 100644
--- a/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/JacksonTest.java
+++ b/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/JacksonTest.java
@@ -25,6 +25,7 @@
import org.apache.gossip.crdt.LwwSet;
import org.apache.gossip.crdt.MaxChangeSet;
import org.apache.gossip.crdt.OrSet;
+import org.apache.gossip.crdt.TwoPhaseSet;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.manager.GossipManagerBuilder;
import org.apache.gossip.protocol.ProtocolManager;
@@ -98,17 +99,22 @@
@Test
public void jacksonOrSetTest(){
- jacksonCrdtSeDeTest(new OrSet<>("1", "2", "3"), OrSet.class);
+ jacksonCrdtSeDeTest(new OrSet<>("1", "2", "3").remove("2"), OrSet.class);
}
@Test
public void jacksonLWWSetTest(){
- jacksonCrdtSeDeTest(new LwwSet<>("1", "2", "3"), LwwSet.class);
+ jacksonCrdtSeDeTest(new LwwSet<>("1", "2", "3").remove("2"), LwwSet.class);
}
@Test
public void jacksonMaxChangeSetTest(){
- jacksonCrdtSeDeTest(new MaxChangeSet<>("1", "2", "3"), MaxChangeSet.class);
+ jacksonCrdtSeDeTest(new MaxChangeSet<>("1", "2", "3").remove("2"), MaxChangeSet.class);
+ }
+
+ @Test
+ public void jacksonTwoPhaseSetTest(){
+ jacksonCrdtSeDeTest(new TwoPhaseSet<>("1", "2", "3").remove("2"), TwoPhaseSet.class);
}
@Test