GOSSIP-63 Added CRDT G-Counter implementation
diff --git a/src/main/java/org/apache/gossip/crdt/CrdtCounter.java b/src/main/java/org/apache/gossip/crdt/CrdtCounter.java
new file mode 100644
index 0000000..cdc9445
--- /dev/null
+++ b/src/main/java/org/apache/gossip/crdt/CrdtCounter.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.crdt;
+
+public interface CrdtCounter<ValueType extends Number, R extends CrdtCounter<ValueType, R>>
+        extends Crdt<ValueType, R> {
+  
+}
+
diff --git a/src/main/java/org/apache/gossip/crdt/CrdtModule.java b/src/main/java/org/apache/gossip/crdt/CrdtModule.java
index 0c8a787..cfb3f47 100644
--- a/src/main/java/org/apache/gossip/crdt/CrdtModule.java
+++ b/src/main/java/org/apache/gossip/crdt/CrdtModule.java
@@ -42,6 +42,12 @@
   @JsonIgnore abstract boolean isEmpty();
 }
 
+abstract class GrowOnlyCounterMixin {
+  @JsonCreator
+  GrowOnlyCounterMixin(@JsonProperty("counters") Map<String, Long> counters) { }
+  @JsonProperty("counters") abstract Map<String, Long> getCounters();
+}
+
 //If anyone wants to take a stab at this. please have at it
 //https://github.com/FasterXML/jackson-datatype-guava/blob/master/src/main/java/com/fasterxml/jackson/datatype/guava/ser/MultimapSerializer.java
 public class CrdtModule extends SimpleModule {
@@ -56,6 +62,7 @@
   public void setupModule(SetupContext context) {
     context.setMixInAnnotations(OrSet.class, OrSetMixin.class);
     context.setMixInAnnotations(GrowOnlySet.class, GrowOnlySetMixin.class);
+    context.setMixInAnnotations(GrowOnlyCounter.class, GrowOnlyCounterMixin.class);
   }
 
 }
diff --git a/src/main/java/org/apache/gossip/crdt/GrowOnlyCounter.java b/src/main/java/org/apache/gossip/crdt/GrowOnlyCounter.java
new file mode 100644
index 0000000..9156142
--- /dev/null
+++ b/src/main/java/org/apache/gossip/crdt/GrowOnlyCounter.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gossip.crdt;
+
+import org.apache.gossip.manager.GossipManager;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class GrowOnlyCounter implements CrdtCounter<Long, GrowOnlyCounter> {
+  
+  private final Map<String, Long> counters = new HashMap<>();
+  
+  GrowOnlyCounter(Map<String, Long> counters) {
+    this.counters.putAll(counters);
+  }
+  
+  public GrowOnlyCounter(GrowOnlyCounter growOnlyCounter, Builder builder) {
+    counters.putAll(growOnlyCounter.counters);
+    if (counters.containsKey(builder.myId)) {
+      Long newValue = counters.get(builder.myId) + builder.counter;
+      counters.replace(builder.myId, newValue);
+    } else {
+      counters.put(builder.myId, builder.counter);
+    }
+  }
+  
+  public GrowOnlyCounter(Builder builder) {
+    counters.put(builder.myId, builder.counter);
+  }
+  
+  public GrowOnlyCounter(GossipManager manager) {
+    counters.put(manager.getMyself().getId(), 0L);
+  }
+  
+  public GrowOnlyCounter(GrowOnlyCounter growOnlyCounter, GrowOnlyCounter other) {
+    counters.putAll(growOnlyCounter.counters);
+    for (Map.Entry<String, Long> entry : other.counters.entrySet()) {
+      String otherKey = entry.getKey();
+      Long otherValue = entry.getValue();
+      
+      if (counters.containsKey(otherKey)) {
+        Long newValue = Math.max(counters.get(otherKey), otherValue);
+        counters.replace(otherKey, newValue);
+      } else {
+        counters.put(otherKey, otherValue);
+      }
+    }
+  }
+  
+  @Override
+  public GrowOnlyCounter merge(GrowOnlyCounter other) {
+    return new GrowOnlyCounter(this, other);
+  }
+  
+  @Override
+  public Long value() {
+    Long globalCount = 0L;
+    for (Long increment : counters.values()) {
+      globalCount += increment;
+    }
+    return globalCount;
+  }
+  
+  @Override
+  public GrowOnlyCounter optimize() {
+    return new GrowOnlyCounter(counters);
+  }
+  
+  @Override
+  public boolean equals(Object obj) {
+    if (getClass() != obj.getClass())
+      return false;
+    GrowOnlyCounter other = (GrowOnlyCounter) obj;
+    return value().longValue() == other.value().longValue();
+  }
+  
+  @Override
+  public String toString() {
+    return "GrowOnlyCounter [counters= " + counters + ", Value=" + value() + "]";
+  }
+  
+  Map<String, Long> getCounters() {
+    return counters;
+  }
+  
+  public static class Builder {
+    
+    private final String myId;
+    
+    private Long counter;
+    
+    public Builder(GossipManager gossipManager) {
+      myId = gossipManager.getMyself().getId();
+      counter = 0L;
+    }
+    
+    public GrowOnlyCounter.Builder increment(Integer count) {
+      counter += count;
+      return this;
+    }
+  }
+}
diff --git a/src/test/java/org/apache/gossip/DataTest.java b/src/test/java/org/apache/gossip/DataTest.java
index 3892e9b..a8d57a7 100644
--- a/src/test/java/org/apache/gossip/DataTest.java
+++ b/src/test/java/org/apache/gossip/DataTest.java
@@ -28,6 +28,7 @@
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.gossip.crdt.GrowOnlyCounter;
 import org.apache.gossip.crdt.GrowOnlySet;
 import org.apache.gossip.crdt.OrSet;
 import org.apache.gossip.model.GossipDataMessage;
@@ -39,6 +40,7 @@
 public class DataTest {
   
   private String orSetKey = "cror";
+  private String gCounterKey = "crdtgc";
   
   @Test
   public void dataTest() throws InterruptedException, UnknownHostException, URISyntaxException{
@@ -72,7 +74,7 @@
     clients.get(0).gossipPerNodeData(msg());
     clients.get(0).gossipSharedData(sharedMsg());
 
-    TUnit.assertThat(()-> {      
+    TUnit.assertThat(()-> {
       GossipDataMessage x = clients.get(1).findPerNodeData(1 + "", "a");
       if (x == null)
         return "";
@@ -80,7 +82,7 @@
         return x.getPayload();
     }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("b");
     
-    TUnit.assertThat(() ->  {    
+    TUnit.assertThat(() ->  {
       SharedGossipDataMessage x = clients.get(1).findSharedData("a");
       if (x == null)
         return "";
@@ -96,11 +98,49 @@
     dropIt(clients);
     assertThatOrSetDelIsMerged(clients);
     
+    // test g counter
+    givenDifferentIncrement(clients);
+    assertThatCountIsUpdated(clients, 3);
+    givenIncreaseOther(clients);
+    assertThatCountIsUpdated(clients, 7);
+
     for (int i = 0; i < clusterMembers; ++i) {
       clients.get(i).shutdown();
     }
   }
   
+  private void givenDifferentIncrement(final List<GossipService> clients) {
+    {
+      SharedGossipDataMessage d = new SharedGossipDataMessage();
+      d.setKey(gCounterKey);
+      d.setPayload(new GrowOnlyCounter(new GrowOnlyCounter.Builder(clients.get(0).getGossipManager()).increment(1)));
+      d.setExpireAt(Long.MAX_VALUE);
+      d.setTimestamp(System.currentTimeMillis());
+      clients.get(0).getGossipManager().merge(d);
+    }
+    {
+      SharedGossipDataMessage d = new SharedGossipDataMessage();
+      d.setKey(gCounterKey);
+      d.setPayload(new GrowOnlyCounter(new GrowOnlyCounter.Builder(clients.get(1).getGossipManager()).increment(2)));
+      d.setExpireAt(Long.MAX_VALUE);
+      d.setTimestamp(System.currentTimeMillis());
+      clients.get(1).getGossipManager().merge(d);
+    }
+  }
+
+  private void givenIncreaseOther(final List<GossipService> clients) {
+    GrowOnlyCounter gc = (GrowOnlyCounter) clients.get(1).getGossipManager().findCrdt(gCounterKey);
+    GrowOnlyCounter gc2 = new GrowOnlyCounter(gc,
+            new GrowOnlyCounter.Builder(clients.get(1).getGossipManager()).increment(4));
+
+    SharedGossipDataMessage d = new SharedGossipDataMessage();
+    d.setKey(gCounterKey);
+    d.setPayload(gc2);
+    d.setExpireAt(Long.MAX_VALUE);
+    d.setTimestamp(System.currentTimeMillis());
+    clients.get(1).getGossipManager().merge(d);
+  }
+
   private void givenOrs(List<GossipService> clients) {
     {
       SharedGossipDataMessage d = new SharedGossipDataMessage();
@@ -152,6 +192,13 @@
     clients.get(1).getGossipManager().merge(CrdtMessage("2"));
   }
   
+  private void assertThatCountIsUpdated(final List<GossipService> clients, int finalCount) {
+    TUnit.assertThat(() -> {
+      return clients.get(0).getGossipManager().findCrdt(gCounterKey);
+    }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(new GrowOnlyCounter(
+            new GrowOnlyCounter.Builder(clients.get(0).getGossipManager()).increment(finalCount)));
+  }
+
   private void assertThatListIsMerged(final List<GossipService> clients){
     TUnit.assertThat(() ->  {
       return clients.get(0).getGossipManager().findCrdt("cr");
@@ -164,7 +211,7 @@
     d.setPayload(new GrowOnlySet<String>( Arrays.asList(item)));
     d.setExpireAt(Long.MAX_VALUE);
     d.setTimestamp(System.currentTimeMillis());
-    return d;  
+    return d;
   }
   
   private GossipDataMessage msg(){
diff --git a/src/test/java/org/apache/gossip/crdt/GrowOnlyCounterTest.java b/src/test/java/org/apache/gossip/crdt/GrowOnlyCounterTest.java
new file mode 100644
index 0000000..3a134af
--- /dev/null
+++ b/src/test/java/org/apache/gossip/crdt/GrowOnlyCounterTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.crdt;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class GrowOnlyCounterTest {
+  
+  @Test
+  public void mergeTest() {
+    
+    Map<String, Long> node1Counter = new HashMap<>();
+    node1Counter.put("1", 3L);
+    Map<String, Long> node2Counter = new HashMap<>();
+    node2Counter.put("2", 1L);
+    Map<String, Long> node3Counter = new HashMap<>();
+    node3Counter.put("3", 2L);
+    
+    GrowOnlyCounter gCounter1 = new GrowOnlyCounter(node1Counter);
+    GrowOnlyCounter gCounter2 = new GrowOnlyCounter(node2Counter);
+    GrowOnlyCounter gCounter3 = new GrowOnlyCounter(node3Counter);
+    
+    // After node 2 receive from node 1
+    gCounter2 = gCounter2.merge(gCounter1);
+    Assert.assertEquals(4, (long) gCounter2.value());
+    
+    // After node 3 receive from node 1
+    gCounter3 = gCounter3.merge(gCounter1);
+    Assert.assertEquals(5, (long) gCounter3.value());
+    
+    // After node 3 receive from node 2
+    gCounter3 = gCounter3.merge(gCounter2);
+    Assert.assertEquals(6, (long) gCounter3.value());
+  }
+}