GOSSIP-63 Added CRDT G-Counter implementation
diff --git a/src/main/java/org/apache/gossip/crdt/CrdtCounter.java b/src/main/java/org/apache/gossip/crdt/CrdtCounter.java
new file mode 100644
index 0000000..cdc9445
--- /dev/null
+++ b/src/main/java/org/apache/gossip/crdt/CrdtCounter.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.crdt;
+
+public interface CrdtCounter<ValueType extends Number, R extends CrdtCounter<ValueType, R>>
+ extends Crdt<ValueType, R> {
+
+}
+
diff --git a/src/main/java/org/apache/gossip/crdt/CrdtModule.java b/src/main/java/org/apache/gossip/crdt/CrdtModule.java
index 0c8a787..cfb3f47 100644
--- a/src/main/java/org/apache/gossip/crdt/CrdtModule.java
+++ b/src/main/java/org/apache/gossip/crdt/CrdtModule.java
@@ -42,6 +42,12 @@
@JsonIgnore abstract boolean isEmpty();
}
+abstract class GrowOnlyCounterMixin {
+ @JsonCreator
+ GrowOnlyCounterMixin(@JsonProperty("counters") Map<String, Long> counters) { }
+ @JsonProperty("counters") abstract Map<String, Long> getCounters();
+}
+
//If anyone wants to take a stab at this. please have at it
//https://github.com/FasterXML/jackson-datatype-guava/blob/master/src/main/java/com/fasterxml/jackson/datatype/guava/ser/MultimapSerializer.java
public class CrdtModule extends SimpleModule {
@@ -56,6 +62,7 @@
public void setupModule(SetupContext context) {
context.setMixInAnnotations(OrSet.class, OrSetMixin.class);
context.setMixInAnnotations(GrowOnlySet.class, GrowOnlySetMixin.class);
+ context.setMixInAnnotations(GrowOnlyCounter.class, GrowOnlyCounterMixin.class);
}
}
diff --git a/src/main/java/org/apache/gossip/crdt/GrowOnlyCounter.java b/src/main/java/org/apache/gossip/crdt/GrowOnlyCounter.java
new file mode 100644
index 0000000..9156142
--- /dev/null
+++ b/src/main/java/org/apache/gossip/crdt/GrowOnlyCounter.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gossip.crdt;
+
+import org.apache.gossip.manager.GossipManager;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class GrowOnlyCounter implements CrdtCounter<Long, GrowOnlyCounter> {
+
+ private final Map<String, Long> counters = new HashMap<>();
+
+ GrowOnlyCounter(Map<String, Long> counters) {
+ this.counters.putAll(counters);
+ }
+
+ public GrowOnlyCounter(GrowOnlyCounter growOnlyCounter, Builder builder) {
+ counters.putAll(growOnlyCounter.counters);
+ if (counters.containsKey(builder.myId)) {
+ Long newValue = counters.get(builder.myId) + builder.counter;
+ counters.replace(builder.myId, newValue);
+ } else {
+ counters.put(builder.myId, builder.counter);
+ }
+ }
+
+ public GrowOnlyCounter(Builder builder) {
+ counters.put(builder.myId, builder.counter);
+ }
+
+ public GrowOnlyCounter(GossipManager manager) {
+ counters.put(manager.getMyself().getId(), 0L);
+ }
+
+ public GrowOnlyCounter(GrowOnlyCounter growOnlyCounter, GrowOnlyCounter other) {
+ counters.putAll(growOnlyCounter.counters);
+ for (Map.Entry<String, Long> entry : other.counters.entrySet()) {
+ String otherKey = entry.getKey();
+ Long otherValue = entry.getValue();
+
+ if (counters.containsKey(otherKey)) {
+ Long newValue = Math.max(counters.get(otherKey), otherValue);
+ counters.replace(otherKey, newValue);
+ } else {
+ counters.put(otherKey, otherValue);
+ }
+ }
+ }
+
+ @Override
+ public GrowOnlyCounter merge(GrowOnlyCounter other) {
+ return new GrowOnlyCounter(this, other);
+ }
+
+ @Override
+ public Long value() {
+ Long globalCount = 0L;
+ for (Long increment : counters.values()) {
+ globalCount += increment;
+ }
+ return globalCount;
+ }
+
+ @Override
+ public GrowOnlyCounter optimize() {
+ return new GrowOnlyCounter(counters);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (getClass() != obj.getClass())
+ return false;
+ GrowOnlyCounter other = (GrowOnlyCounter) obj;
+ return value().longValue() == other.value().longValue();
+ }
+
+ @Override
+ public String toString() {
+ return "GrowOnlyCounter [counters= " + counters + ", Value=" + value() + "]";
+ }
+
+ Map<String, Long> getCounters() {
+ return counters;
+ }
+
+ public static class Builder {
+
+ private final String myId;
+
+ private Long counter;
+
+ public Builder(GossipManager gossipManager) {
+ myId = gossipManager.getMyself().getId();
+ counter = 0L;
+ }
+
+ public GrowOnlyCounter.Builder increment(Integer count) {
+ counter += count;
+ return this;
+ }
+ }
+}
diff --git a/src/test/java/org/apache/gossip/DataTest.java b/src/test/java/org/apache/gossip/DataTest.java
index 3892e9b..a8d57a7 100644
--- a/src/test/java/org/apache/gossip/DataTest.java
+++ b/src/test/java/org/apache/gossip/DataTest.java
@@ -28,6 +28,7 @@
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import org.apache.gossip.crdt.GrowOnlyCounter;
import org.apache.gossip.crdt.GrowOnlySet;
import org.apache.gossip.crdt.OrSet;
import org.apache.gossip.model.GossipDataMessage;
@@ -39,6 +40,7 @@
public class DataTest {
private String orSetKey = "cror";
+ private String gCounterKey = "crdtgc";
@Test
public void dataTest() throws InterruptedException, UnknownHostException, URISyntaxException{
@@ -72,7 +74,7 @@
clients.get(0).gossipPerNodeData(msg());
clients.get(0).gossipSharedData(sharedMsg());
- TUnit.assertThat(()-> {
+ TUnit.assertThat(()-> {
GossipDataMessage x = clients.get(1).findPerNodeData(1 + "", "a");
if (x == null)
return "";
@@ -80,7 +82,7 @@
return x.getPayload();
}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("b");
- TUnit.assertThat(() -> {
+ TUnit.assertThat(() -> {
SharedGossipDataMessage x = clients.get(1).findSharedData("a");
if (x == null)
return "";
@@ -96,11 +98,49 @@
dropIt(clients);
assertThatOrSetDelIsMerged(clients);
+ // test g counter
+ givenDifferentIncrement(clients);
+ assertThatCountIsUpdated(clients, 3);
+ givenIncreaseOther(clients);
+ assertThatCountIsUpdated(clients, 7);
+
for (int i = 0; i < clusterMembers; ++i) {
clients.get(i).shutdown();
}
}
+ private void givenDifferentIncrement(final List<GossipService> clients) {
+ {
+ SharedGossipDataMessage d = new SharedGossipDataMessage();
+ d.setKey(gCounterKey);
+ d.setPayload(new GrowOnlyCounter(new GrowOnlyCounter.Builder(clients.get(0).getGossipManager()).increment(1)));
+ d.setExpireAt(Long.MAX_VALUE);
+ d.setTimestamp(System.currentTimeMillis());
+ clients.get(0).getGossipManager().merge(d);
+ }
+ {
+ SharedGossipDataMessage d = new SharedGossipDataMessage();
+ d.setKey(gCounterKey);
+ d.setPayload(new GrowOnlyCounter(new GrowOnlyCounter.Builder(clients.get(1).getGossipManager()).increment(2)));
+ d.setExpireAt(Long.MAX_VALUE);
+ d.setTimestamp(System.currentTimeMillis());
+ clients.get(1).getGossipManager().merge(d);
+ }
+ }
+
+ private void givenIncreaseOther(final List<GossipService> clients) {
+ GrowOnlyCounter gc = (GrowOnlyCounter) clients.get(1).getGossipManager().findCrdt(gCounterKey);
+ GrowOnlyCounter gc2 = new GrowOnlyCounter(gc,
+ new GrowOnlyCounter.Builder(clients.get(1).getGossipManager()).increment(4));
+
+ SharedGossipDataMessage d = new SharedGossipDataMessage();
+ d.setKey(gCounterKey);
+ d.setPayload(gc2);
+ d.setExpireAt(Long.MAX_VALUE);
+ d.setTimestamp(System.currentTimeMillis());
+ clients.get(1).getGossipManager().merge(d);
+ }
+
private void givenOrs(List<GossipService> clients) {
{
SharedGossipDataMessage d = new SharedGossipDataMessage();
@@ -152,6 +192,13 @@
clients.get(1).getGossipManager().merge(CrdtMessage("2"));
}
+ private void assertThatCountIsUpdated(final List<GossipService> clients, int finalCount) {
+ TUnit.assertThat(() -> {
+ return clients.get(0).getGossipManager().findCrdt(gCounterKey);
+ }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(new GrowOnlyCounter(
+ new GrowOnlyCounter.Builder(clients.get(0).getGossipManager()).increment(finalCount)));
+ }
+
private void assertThatListIsMerged(final List<GossipService> clients){
TUnit.assertThat(() -> {
return clients.get(0).getGossipManager().findCrdt("cr");
@@ -164,7 +211,7 @@
d.setPayload(new GrowOnlySet<String>( Arrays.asList(item)));
d.setExpireAt(Long.MAX_VALUE);
d.setTimestamp(System.currentTimeMillis());
- return d;
+ return d;
}
private GossipDataMessage msg(){
diff --git a/src/test/java/org/apache/gossip/crdt/GrowOnlyCounterTest.java b/src/test/java/org/apache/gossip/crdt/GrowOnlyCounterTest.java
new file mode 100644
index 0000000..3a134af
--- /dev/null
+++ b/src/test/java/org/apache/gossip/crdt/GrowOnlyCounterTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.crdt;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class GrowOnlyCounterTest {
+
+ @Test
+ public void mergeTest() {
+
+ Map<String, Long> node1Counter = new HashMap<>();
+ node1Counter.put("1", 3L);
+ Map<String, Long> node2Counter = new HashMap<>();
+ node2Counter.put("2", 1L);
+ Map<String, Long> node3Counter = new HashMap<>();
+ node3Counter.put("3", 2L);
+
+ GrowOnlyCounter gCounter1 = new GrowOnlyCounter(node1Counter);
+ GrowOnlyCounter gCounter2 = new GrowOnlyCounter(node2Counter);
+ GrowOnlyCounter gCounter3 = new GrowOnlyCounter(node3Counter);
+
+ // After node 2 receive from node 1
+ gCounter2 = gCounter2.merge(gCounter1);
+ Assert.assertEquals(4, (long) gCounter2.value());
+
+ // After node 3 receive from node 1
+ gCounter3 = gCounter3.merge(gCounter1);
+ Assert.assertEquals(5, (long) gCounter3.value());
+
+ // After node 3 receive from node 2
+ gCounter3 = gCounter3.merge(gCounter2);
+ Assert.assertEquals(6, (long) gCounter3.value());
+ }
+}