Merge branch 'GOSSIP-55' of https://github.com/Mirage20/incubator-gossip
diff --git a/gossip-base/src/main/java/org/apache/gossip/event/data/DataEventConstants.java b/gossip-base/src/main/java/org/apache/gossip/event/data/DataEventConstants.java
new file mode 100644
index 0000000..217087f
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/event/data/DataEventConstants.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.event.data;
+
+public class DataEventConstants {
+  
+  // MetricRegistry
+  public static final String PER_NODE_DATA_SUBSCRIBERS_SIZE
+          = "gossip.event.data.pernode.subscribers.size";
+  public static final String PER_NODE_DATA_SUBSCRIBERS_QUEUE_SIZE
+          = "gossip.event.data.pernode.subscribers.queue.size";
+  public static final String SHARED_DATA_SUBSCRIBERS_SIZE
+          = "gossip.event.data.shared.subscribers.size";
+  public static final String SHARED_DATA_SUBSCRIBERS_QUEUE_SIZE
+          = "gossip.event.data.shared.subscribers.queue.size";
+  
+  // Thread pool
+  public static final int PER_NODE_DATA_QUEUE_SIZE = 64;
+  public static final int PER_NODE_DATA_CORE_POOL_SIZE = 1;
+  public static final int PER_NODE_DATA_MAX_POOL_SIZE = 30;
+  public static final int PER_NODE_DATA_KEEP_ALIVE_TIME_SECONDS = 1;
+  public static final int SHARED_DATA_QUEUE_SIZE = 64;
+  public static final int SHARED_DATA_CORE_POOL_SIZE = 1;
+  public static final int SHARED_DATA_MAX_POOL_SIZE = 30;
+  public static final int SHARED_DATA_KEEP_ALIVE_TIME_SECONDS = 1;
+  
+}
diff --git a/gossip-base/src/main/java/org/apache/gossip/event/data/DataEventManager.java b/gossip-base/src/main/java/org/apache/gossip/event/data/DataEventManager.java
new file mode 100644
index 0000000..3124df1
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/event/data/DataEventManager.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.event.data;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.MetricRegistry;
+
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+public class DataEventManager {
+  
+  private final List<UpdateNodeDataEventHandler> perNodeDataHandlers;
+  private final BlockingQueue<Runnable> perNodeDataHandlerQueue;
+  private final ExecutorService perNodeDataEventExecutor;
+  private final List<UpdateSharedDataEventHandler> sharedDataHandlers;
+  private final BlockingQueue<Runnable> sharedDataHandlerQueue;
+  private final ExecutorService sharedDataEventExecutor;
+  
+  public DataEventManager(MetricRegistry metrics) {
+    perNodeDataHandlers = new CopyOnWriteArrayList<>();
+    perNodeDataHandlerQueue = new ArrayBlockingQueue<>(DataEventConstants.PER_NODE_DATA_QUEUE_SIZE);
+    perNodeDataEventExecutor = new ThreadPoolExecutor(
+            DataEventConstants.PER_NODE_DATA_CORE_POOL_SIZE,
+            DataEventConstants.PER_NODE_DATA_MAX_POOL_SIZE,
+            DataEventConstants.PER_NODE_DATA_KEEP_ALIVE_TIME_SECONDS, TimeUnit.SECONDS,
+            perNodeDataHandlerQueue, new ThreadPoolExecutor.DiscardOldestPolicy());
+    
+    sharedDataHandlers = new CopyOnWriteArrayList<>();
+    sharedDataHandlerQueue = new ArrayBlockingQueue<>(DataEventConstants.SHARED_DATA_QUEUE_SIZE);
+    sharedDataEventExecutor = new ThreadPoolExecutor(DataEventConstants.SHARED_DATA_CORE_POOL_SIZE,
+            DataEventConstants.SHARED_DATA_MAX_POOL_SIZE,
+            DataEventConstants.SHARED_DATA_KEEP_ALIVE_TIME_SECONDS, TimeUnit.SECONDS,
+            sharedDataHandlerQueue, new ThreadPoolExecutor.DiscardOldestPolicy());
+    
+    metrics.register(DataEventConstants.PER_NODE_DATA_SUBSCRIBERS_SIZE,
+            (Gauge<Integer>) () -> perNodeDataHandlers.size());
+    metrics.register(DataEventConstants.PER_NODE_DATA_SUBSCRIBERS_QUEUE_SIZE,
+            (Gauge<Integer>) () -> perNodeDataHandlerQueue.size());
+    metrics.register(DataEventConstants.SHARED_DATA_SUBSCRIBERS_SIZE,
+            (Gauge<Integer>) () -> sharedDataHandlers.size());
+    metrics.register(DataEventConstants.SHARED_DATA_SUBSCRIBERS_QUEUE_SIZE,
+            (Gauge<Integer>) () -> sharedDataHandlerQueue.size());
+    
+  }
+  
+  public void notifySharedData(final String key, final Object newValue, final Object oldValue) {
+    sharedDataHandlers.forEach(handler -> sharedDataEventExecutor
+            .execute(() -> handler.onUpdate(key, oldValue, newValue)));
+  }
+  
+  public void notifyPerNodeData(final String nodeId, final String key, final Object newValue,
+          final Object oldValue) {
+    perNodeDataHandlers.forEach(handler -> perNodeDataEventExecutor
+            .execute(() -> handler.onUpdate(nodeId, key, oldValue, newValue)));
+  }
+  
+  public void registerPerNodeDataSubscriber(UpdateNodeDataEventHandler handler) {
+    perNodeDataHandlers.add(handler);
+  }
+  
+  public void unregisterPerNodeDataSubscriber(UpdateNodeDataEventHandler handler) {
+    perNodeDataHandlers.remove(handler);
+  }
+  
+  public int getPerNodeSubscribersSize() {
+    return perNodeDataHandlers.size();
+  }
+  
+  public void registerSharedDataSubscriber(UpdateSharedDataEventHandler handler) {
+    sharedDataHandlers.add(handler);
+  }
+  
+  public void unregisterSharedDataSubscriber(UpdateSharedDataEventHandler handler) {
+    sharedDataHandlers.remove(handler);
+  }
+  
+  public int getSharedDataSubscribersSize() {
+    return sharedDataHandlers.size();
+  }
+  
+}
diff --git a/gossip-base/src/main/java/org/apache/gossip/event/data/UpdateNodeDataEventHandler.java b/gossip-base/src/main/java/org/apache/gossip/event/data/UpdateNodeDataEventHandler.java
new file mode 100644
index 0000000..ca88c17
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/event/data/UpdateNodeDataEventHandler.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.event.data;
+
+/**
+ * Event handler interface for the per node data items.
+ * Classes which implement this interface get notifications when per node data item get changed.
+ */
+public interface UpdateNodeDataEventHandler {
+  
+  /**
+   * This method get called when a per node datum get changed.
+   *
+   * @param nodeId   id of the node that change the value
+   * @param key      key of the datum
+   * @param oldValue previous value of the datum or null if the datum is discovered
+   *                 for the first time
+   * @param newValue updated value of the datum
+   */
+  void onUpdate(String nodeId, String key, Object oldValue, Object newValue);
+  
+}
diff --git a/gossip-base/src/main/java/org/apache/gossip/event/data/UpdateSharedDataEventHandler.java b/gossip-base/src/main/java/org/apache/gossip/event/data/UpdateSharedDataEventHandler.java
new file mode 100644
index 0000000..5655732
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/event/data/UpdateSharedDataEventHandler.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.event.data;
+
+/**
+ * Event handler interface for shared data items.
+ * Classes which implement this interface get notifications when shared data get changed.
+ */
+public interface UpdateSharedDataEventHandler {
+  /**
+   * This method get called when shared data get changed.
+   *
+   * @param key      key of the shared data item
+   * @param oldValue previous value or null if the data is discovered for the first time
+   * @param newValue updated value of the data item
+   */
+  void onUpdate(String key, Object oldValue, Object newValue);
+  
+}
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java b/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java
index e034432..4167664 100644
--- a/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java
@@ -20,12 +20,18 @@
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.MetricRegistry;
-import org.apache.gossip.Member;
 import org.apache.gossip.LocalMember;
+import org.apache.gossip.Member;
 import org.apache.gossip.RemoteMember;
 import org.apache.gossip.crdt.Crdt;
 import org.apache.gossip.event.GossipState;
-import org.apache.gossip.model.*;
+import org.apache.gossip.event.data.DataEventManager;
+import org.apache.gossip.event.data.UpdateNodeDataEventHandler;
+import org.apache.gossip.event.data.UpdateSharedDataEventHandler;
+import org.apache.gossip.model.Base;
+import org.apache.gossip.model.PerNodeDataMessage;
+import org.apache.gossip.model.Response;
+import org.apache.gossip.model.SharedDataMessage;
 import org.apache.gossip.udp.Trackable;
 import org.apache.log4j.Logger;
 
@@ -55,13 +61,15 @@
   private final Meter messageSerdeException;
   private final Meter tranmissionException;
   private final Meter tranmissionSuccess;
-
+  private final DataEventManager eventManager;
+  
   public GossipCore(GossipManager manager, MetricRegistry metrics){
     this.gossipManager = manager;
     requests = new ConcurrentHashMap<>();
     workQueue = new ArrayBlockingQueue<>(1024);
     perNodeData = new ConcurrentHashMap<>();
     sharedData = new ConcurrentHashMap<>();
+    eventManager = new DataEventManager(metrics);
     metrics.register(WORKQUEUE_SIZE, (Gauge<Integer>)() -> workQueue.size());
     metrics.register(PER_NODE_DATA_SIZE, (Gauge<Integer>)() -> perNodeData.size());
     metrics.register(SHARED_DATA_SIZE, (Gauge<Integer>)() ->  sharedData.size());
@@ -76,6 +84,7 @@
     while (true){
       SharedDataMessage previous = sharedData.putIfAbsent(message.getKey(), message);
       if (previous == null){
+        eventManager.notifySharedData(message.getKey(), message.getPayload(), null);
         return;
       }
       if (message.getPayload() instanceof Crdt){
@@ -88,12 +97,17 @@
         merged.setPayload(mergedCrdt);
         boolean replaced = sharedData.replace(message.getKey(), previous, merged);
         if (replaced){
+          if(!merged.getPayload().equals(previous.getPayload())) {
+            eventManager
+                    .notifySharedData(message.getKey(), merged.getPayload(), previous.getPayload());
+          }
           return;
         }
       } else {
         if (previous.getTimestamp() < message.getTimestamp()){
           boolean result = sharedData.replace(message.getKey(), previous, message);
           if (result){
+            eventManager.notifySharedData(message.getKey(), message.getPayload(), previous.getPayload());
             return;
           }
         } else {
@@ -102,7 +116,7 @@
       }
     }
   }
-  
+
   public void addPerNodeData(PerNodeDataMessage message){
     ConcurrentHashMap<String,PerNodeDataMessage> nodeMap = new ConcurrentHashMap<>();
     nodeMap.put(message.getKey(), message);
@@ -111,11 +125,16 @@
       PerNodeDataMessage current = nodeMap.get(message.getKey());
       if (current == null){
         nodeMap.putIfAbsent(message.getKey(), message);
+        eventManager.notifyPerNodeData(message.getNodeId(), message.getKey(), message.getPayload(), null);
       } else {
         if (current.getTimestamp() < message.getTimestamp()){
           nodeMap.replace(message.getKey(), current, message);
+          eventManager.notifyPerNodeData(message.getNodeId(), message.getKey(), message.getPayload(),
+                  current.getPayload());
         }
       }
+    } else {
+      eventManager.notifyPerNodeData(message.getNodeId(), message.getKey(), message.getPayload(), null);
     }
   }
 
@@ -178,7 +197,7 @@
     sendInternal(message, uri);
     if (latchAndBase == null){
       return null;
-    } 
+    }
     
     try {
       boolean complete = latchAndBase.latch.await(1, TimeUnit.SECONDS);
@@ -297,4 +316,20 @@
       }
     }
   }
+  
+  void registerPerNodeDataSubscriber(UpdateNodeDataEventHandler handler){
+    eventManager.registerPerNodeDataSubscriber(handler);
+  }
+  
+  void registerSharedDataSubscriber(UpdateSharedDataEventHandler handler){
+    eventManager.registerSharedDataSubscriber(handler);
+  }
+  
+  void unregisterPerNodeDataSubscriber(UpdateNodeDataEventHandler handler){
+    eventManager.unregisterPerNodeDataSubscriber(handler);
+  }
+  
+  void unregisterSharedDataSubscriber(UpdateSharedDataEventHandler handler){
+    eventManager.unregisterSharedDataSubscriber(handler);
+  }
 }
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java
index 133a79f..d839b2e 100644
--- a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java
@@ -26,6 +26,8 @@
 import org.apache.gossip.crdt.Crdt;
 import org.apache.gossip.event.GossipListener;
 import org.apache.gossip.event.GossipState;
+import org.apache.gossip.event.data.UpdateNodeDataEventHandler;
+import org.apache.gossip.event.data.UpdateSharedDataEventHandler;
 import org.apache.gossip.manager.handlers.MessageHandler;
 import org.apache.gossip.model.PerNodeDataMessage;
 import org.apache.gossip.model.SharedDataMessage;
@@ -348,4 +350,20 @@
     return new File(manager.getSettings().getPathToDataState(), "pernodedata."
             + manager.getMyself().getClusterName() + "." + manager.getMyself().getId() + ".json");
   }
+  
+  public void registerPerNodeDataSubscriber(UpdateNodeDataEventHandler handler){
+    gossipCore.registerPerNodeDataSubscriber(handler);
+  }
+  
+  public void registerSharedDataSubscriber(UpdateSharedDataEventHandler handler){
+    gossipCore.registerSharedDataSubscriber(handler);
+  }
+  
+  public void unregisterPerNodeDataSubscriber(UpdateNodeDataEventHandler handler){
+    gossipCore.unregisterPerNodeDataSubscriber(handler);
+  }
+  
+  public void unregisterSharedDataSubscriber(UpdateSharedDataEventHandler handler){
+    gossipCore.unregisterSharedDataSubscriber(handler);
+  }
 }
diff --git a/gossip-base/src/test/java/org/apache/gossip/event/data/DataEventManagerTest.java b/gossip-base/src/test/java/org/apache/gossip/event/data/DataEventManagerTest.java
new file mode 100644
index 0000000..d9d778f
--- /dev/null
+++ b/gossip-base/src/test/java/org/apache/gossip/event/data/DataEventManagerTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.event.data;
+
+import com.codahale.metrics.MetricRegistry;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.platform.runner.JUnitPlatform;
+import org.junit.runner.RunWith;
+
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+@RunWith(JUnitPlatform.class)
+public class DataEventManagerTest {
+  
+  private static Semaphore semaphore;
+  private String receivedNodeId;
+  private String receivedKey;
+  private Object receivedNewValue;
+  private Object receivedOldValue;
+  
+  @BeforeClass
+  public static void setup() {
+    semaphore = new Semaphore(0);
+  }
+  
+  @Test
+  public void perNodeDataEventHandlerAddRemoveTest() {
+    DataEventManager eventManager = new DataEventManager(new MetricRegistry());
+    
+    UpdateNodeDataEventHandler nodeDataEventHandler = (nodeId, key, oldValue, newValue) -> {
+    };
+    
+    eventManager.registerPerNodeDataSubscriber(nodeDataEventHandler);
+    Assert.assertEquals(1, eventManager.getPerNodeSubscribersSize());
+    eventManager.unregisterPerNodeDataSubscriber(nodeDataEventHandler);
+    Assert.assertEquals(0, eventManager.getPerNodeSubscribersSize());
+  }
+  
+  // Test whether the per node data events are fired for matching key
+  @Test
+  public void perNodeDataEventHandlerTest() throws InterruptedException {
+    DataEventManager eventManager = new DataEventManager(new MetricRegistry());
+    resetData();
+
+    // A new subscriber "Juliet" is like to notified when per node data change for the key "Romeo"
+    UpdateNodeDataEventHandler juliet = (nodeId, key, oldValue, newValue) -> {
+      if(!key.equals("Romeo")) return;
+      receivedNodeId = nodeId;
+      receivedKey = key;
+      receivedNewValue = newValue;
+      receivedOldValue = oldValue;
+      semaphore.release();
+    };
+    // Juliet register with eventManager
+    eventManager.registerPerNodeDataSubscriber(juliet);
+    // Romeo is going to sleep after having dinner
+    eventManager.notifyPerNodeData("Montague", "Romeo", "sleeping", "eating");
+    
+    // Juliet should notified
+    semaphore.tryAcquire(2, TimeUnit.SECONDS);
+    Assert.assertEquals("Montague", receivedNodeId);
+    Assert.assertEquals("Romeo", receivedKey);
+    Assert.assertEquals("sleeping", receivedNewValue);
+    Assert.assertEquals("eating", receivedOldValue);
+    
+    eventManager.unregisterPerNodeDataSubscriber(juliet);
+  }
+  
+  @Test
+  public void sharedDataEventHandlerAddRemoveTest() {
+    DataEventManager eventManager = new DataEventManager(new MetricRegistry());
+    
+    UpdateSharedDataEventHandler sharedDataEventHandler = (key, oldValue, newValue) -> {
+    
+    };
+    eventManager.registerSharedDataSubscriber(sharedDataEventHandler);
+    Assert.assertEquals(1, eventManager.getSharedDataSubscribersSize());
+    eventManager.unregisterSharedDataSubscriber(sharedDataEventHandler);
+    Assert.assertEquals(0, eventManager.getSharedDataSubscribersSize());
+    
+  }
+  
+  // Test whether the shared data events are fired
+  @Test
+  public void sharedDataEventHandlerTest() throws InterruptedException {
+    DataEventManager eventManager = new DataEventManager(new MetricRegistry());
+    resetData();
+    
+    // A new subscriber "Alice" is like to notified when shared data change for the key "technology"
+    UpdateSharedDataEventHandler alice = (key, oldValue, newValue) -> {
+      if(!key.equals("technology")) return;
+      receivedKey = key;
+      receivedNewValue = newValue;
+      receivedOldValue = oldValue;
+      semaphore.release();
+    };
+    // Alice register with eventManager
+    eventManager.registerSharedDataSubscriber(alice);
+    
+    // technology key get changed
+    eventManager.notifySharedData("technology", "Java has lambda", "Java is fast");
+    
+    // Alice should notified
+    semaphore.tryAcquire(2, TimeUnit.SECONDS);
+    Assert.assertEquals("technology", receivedKey);
+    Assert.assertEquals("Java has lambda", receivedNewValue);
+    Assert.assertEquals("Java is fast", receivedOldValue);
+    
+    eventManager.unregisterSharedDataSubscriber(alice);
+  }
+  
+  // Test the MetricRegistry
+  @Test
+  public void metricRegistryTest() {
+    MetricRegistry registry = new MetricRegistry();
+    
+    DataEventManager eventManager = new DataEventManager(registry);
+    
+    UpdateNodeDataEventHandler nodeDataEventHandler = (nodeId, key, oldValue, newValue) -> {
+    };
+    
+    UpdateSharedDataEventHandler sharedDataEventHandler = (key, oldValue, newValue) -> {
+    };
+    
+    eventManager.registerPerNodeDataSubscriber(nodeDataEventHandler);
+    eventManager.registerSharedDataSubscriber(sharedDataEventHandler);
+    
+    Assert.assertEquals(1,
+            registry.getGauges().get(DataEventConstants.PER_NODE_DATA_SUBSCRIBERS_SIZE).getValue());
+    Assert.assertEquals(0,
+            registry.getGauges().get(DataEventConstants.PER_NODE_DATA_SUBSCRIBERS_QUEUE_SIZE)
+                    .getValue());
+    Assert.assertEquals(1,
+            registry.getGauges().get(DataEventConstants.SHARED_DATA_SUBSCRIBERS_SIZE).getValue());
+    Assert.assertEquals(0,
+            registry.getGauges().get(DataEventConstants.SHARED_DATA_SUBSCRIBERS_QUEUE_SIZE)
+                    .getValue());
+    
+  }
+  
+  private void resetData() {
+    receivedNodeId = null;
+    receivedKey = null;
+    receivedNewValue = null;
+    receivedOldValue = null;
+  }
+  
+}
diff --git a/gossip-examples/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java b/gossip-examples/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java
index 7d4db93..78c7782 100644
--- a/gossip-examples/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java
+++ b/gossip-examples/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java
@@ -79,10 +79,21 @@
         } else if (op == 'g') {
           gcount(val, gossipService);
         }
+        if (op == 'l') {
+          listen(val, gossipService);
+        }
       }
     }
   }
-
+  
+  private static void listen(String val, GossipManager gossipManager) {
+    gossipManager.registerSharedDataSubscriber((key, oldValue, newValue) -> {
+      if (key.equals(val)) {
+        System.out.println("Event Handler fired! " + oldValue + " " + newValue);
+      }
+    });
+  }
+  
   private static void gcount(String val, GossipManager gossipManager) {
     GrowOnlyCounter c = (GrowOnlyCounter) gossipManager.findCrdt("def");
     Long l = Long.valueOf(val);
diff --git a/gossip-itest/src/test/java/org/apache/gossip/PerNodeDataEventTest.java b/gossip-itest/src/test/java/org/apache/gossip/PerNodeDataEventTest.java
new file mode 100644
index 0000000..59136d1
--- /dev/null
+++ b/gossip-itest/src/test/java/org/apache/gossip/PerNodeDataEventTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip;
+
+import io.teknek.tunit.TUnit;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.manager.GossipManagerBuilder;
+import org.apache.gossip.model.PerNodeDataMessage;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.platform.runner.JUnitPlatform;
+import org.junit.runner.RunWith;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+@RunWith(JUnitPlatform.class)
+public class PerNodeDataEventTest extends AbstractIntegrationBase {
+  
+  private String receivedKey = "";
+  private String receivingNodeId = "";
+  private Object receivingNodeDataNewValue = "";
+  private Object receivingNodeDataOldValue = "";
+  private Semaphore lock = new Semaphore(0);
+  
+  
+  @Test
+  public void perNodeDataEventTest()
+          throws InterruptedException, UnknownHostException, URISyntaxException {
+    GossipSettings settings = new GossipSettings();
+    settings.setPersistRingState(false);
+    settings.setPersistDataState(false);
+    String cluster = UUID.randomUUID().toString();
+    int seedNodes = 1;
+    List<Member> startupMembers = new ArrayList<>();
+    for (int i = 1; i < seedNodes + 1; ++i) {
+      URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+      startupMembers.add(new RemoteMember(cluster, uri, i + ""));
+    }
+    final List<GossipManager> clients = new ArrayList<>();
+    final int clusterMembers = 2;
+    for (int i = 1; i < clusterMembers + 1; ++i) {
+      URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+      GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri)
+              .id(i + "").gossipMembers(startupMembers).gossipSettings(settings).build();
+      clients.add(gossipService);
+      gossipService.init();
+      register(gossipService);
+    }
+    
+    // check whether the members are discovered
+    TUnit.assertThat(() -> {
+      int total = 0;
+      for (int i = 0; i < clusterMembers; ++i) {
+        total += clients.get(i).getLiveMembers().size();
+      }
+      return total;
+    }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2);
+    
+    // Adding new data to Node 1
+    clients.get(0).gossipPerNodeData(getPerNodeData("category", "distributed"));
+    
+    // Node 2 is interested in data changes for the key "organization" and "category"
+    clients.get(1).registerPerNodeDataSubscriber((nodeId, key, oldValue, newValue) -> {
+      if (!key.equals("organization") && !key.equals("category")) return;
+      receivingNodeId = nodeId;
+      receivedKey = key;
+      receivingNodeDataOldValue = oldValue;
+      receivingNodeDataNewValue = newValue;
+      lock.release();
+    });
+  
+    // Node 2 first time adds Node 1 data
+    lock.tryAcquire(10, TimeUnit.SECONDS);
+    Assert.assertEquals("1", receivingNodeId);
+    Assert.assertEquals("category", receivedKey);
+    Assert.assertEquals(null, receivingNodeDataOldValue);
+    Assert.assertEquals("distributed", receivingNodeDataNewValue);
+  
+    // Node 1 adds new per node data
+    clients.get(0).gossipPerNodeData(getPerNodeData("organization", "apache"));
+    // Node 2 adds new data key from Node 1
+    lock.tryAcquire(10, TimeUnit.SECONDS);
+    Assert.assertEquals("1", receivingNodeId);
+    Assert.assertEquals("organization", receivedKey);
+    Assert.assertEquals(null, receivingNodeDataOldValue);
+    Assert.assertEquals("apache", receivingNodeDataNewValue);
+  
+    // Node 1 updates its value
+    clients.get(0).gossipPerNodeData(getPerNodeData("organization", "apache-gossip"));
+    // Node 2 updates existing value
+    lock.tryAcquire(10, TimeUnit.SECONDS);
+    Assert.assertEquals("1", receivingNodeId);
+    Assert.assertEquals("organization", receivedKey);
+    Assert.assertEquals("apache", receivingNodeDataOldValue);
+    Assert.assertEquals("apache-gossip", receivingNodeDataNewValue);
+    
+  }
+  
+  private PerNodeDataMessage getPerNodeData(String key, String value) {
+    PerNodeDataMessage g = new PerNodeDataMessage();
+    g.setExpireAt(Long.MAX_VALUE);
+    g.setKey(key);
+    g.setPayload(value);
+    g.setTimestamp(System.currentTimeMillis());
+    return g;
+  }
+  
+}
diff --git a/gossip-itest/src/test/java/org/apache/gossip/SharedDataEventTest.java b/gossip-itest/src/test/java/org/apache/gossip/SharedDataEventTest.java
new file mode 100644
index 0000000..56f1657
--- /dev/null
+++ b/gossip-itest/src/test/java/org/apache/gossip/SharedDataEventTest.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip;
+
+import io.teknek.tunit.TUnit;
+import org.apache.gossip.crdt.GrowOnlyCounter;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.manager.GossipManagerBuilder;
+import org.apache.gossip.model.SharedDataMessage;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.platform.runner.JUnitPlatform;
+import org.junit.runner.RunWith;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+@RunWith(JUnitPlatform.class)
+public class SharedDataEventTest extends AbstractIntegrationBase {
+  
+  private String receivedKey = "";
+  private Object receivingNodeDataNewValue = "";
+  private Object receivingNodeDataOldValue = "";
+  private String gCounterKey = "gCounter";
+  private Semaphore lock = new Semaphore(0);
+  
+  @Test
+  public void sharedDataEventTest()
+          throws InterruptedException, UnknownHostException, URISyntaxException {
+    GossipSettings settings = new GossipSettings();
+    settings.setPersistRingState(false);
+    settings.setPersistDataState(false);
+    String cluster = UUID.randomUUID().toString();
+    int seedNodes = 1;
+    List<Member> startupMembers = new ArrayList<>();
+    for (int i = 1; i < seedNodes + 1; ++i) {
+      URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+      startupMembers.add(new RemoteMember(cluster, uri, i + ""));
+    }
+    final List<GossipManager> clients = new ArrayList<>();
+    final int clusterMembers = 2;
+    for (int i = 1; i < clusterMembers + 1; ++i) {
+      URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+      GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri)
+              .id(i + "").gossipMembers(startupMembers).gossipSettings(settings).build();
+      clients.add(gossipService);
+      gossipService.init();
+      register(gossipService);
+    }
+    
+    // check whether the members are discovered
+    TUnit.assertThat(() -> {
+      int total = 0;
+      for (int i = 0; i < clusterMembers; ++i) {
+        total += clients.get(i).getLiveMembers().size();
+      }
+      return total;
+    }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2);
+    
+    // Adding new data to Node 1
+    clients.get(0).gossipSharedData(sharedNodeData("category", "distributed"));
+    
+    // Node 2 is interested in data changes for the key "organization" and "category"
+    clients.get(1).registerSharedDataSubscriber((key, oldValue, newValue) -> {
+      if (!key.equals("organization") && !key.equals("category"))
+        return;
+      receivedKey = key;
+      receivingNodeDataOldValue = oldValue;
+      receivingNodeDataNewValue = newValue;
+      lock.release();
+    });
+    
+    // Node 2 first time gets shared data
+    lock.tryAcquire(10, TimeUnit.SECONDS);
+    Assert.assertEquals("category", receivedKey);
+    Assert.assertEquals(null, receivingNodeDataOldValue);
+    Assert.assertEquals("distributed", receivingNodeDataNewValue);
+    
+    // Node 1 adds new per node data
+    clients.get(0).gossipSharedData(sharedNodeData("organization", "apache"));
+    // Node 2 adds new shared data
+    lock.tryAcquire(10, TimeUnit.SECONDS);
+    Assert.assertEquals("organization", receivedKey);
+    Assert.assertEquals(null, receivingNodeDataOldValue);
+    Assert.assertEquals("apache", receivingNodeDataNewValue);
+    
+    // Node 1 updates its value
+    clients.get(0).gossipSharedData(sharedNodeData("organization", "apache-gossip"));
+    
+    // Node 2 updates existing value
+    lock.tryAcquire(10, TimeUnit.SECONDS);
+    Assert.assertEquals("organization", receivedKey);
+    Assert.assertEquals("apache", receivingNodeDataOldValue);
+    Assert.assertEquals("apache-gossip", receivingNodeDataNewValue);
+    
+  }
+  
+  @Test
+  public void CrdtDataChangeEventTest()
+          throws InterruptedException, UnknownHostException, URISyntaxException {
+    GossipSettings settings = new GossipSettings();
+    settings.setPersistRingState(false);
+    settings.setPersistDataState(false);
+    String cluster = UUID.randomUUID().toString();
+    int seedNodes = 1;
+    List<Member> startupMembers = new ArrayList<>();
+    for (int i = 1; i < seedNodes + 1; ++i) {
+      URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+      startupMembers.add(new RemoteMember(cluster, uri, i + ""));
+    }
+    final List<GossipManager> clients = new ArrayList<>();
+    final int clusterMembers = 3;
+    for (int i = 1; i < clusterMembers + 1; ++i) {
+      URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+      GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri)
+              .id(i + "").gossipMembers(startupMembers).gossipSettings(settings).build();
+      clients.add(gossipService);
+      gossipService.init();
+      register(gossipService);
+    }
+    
+    // check whether the members are discovered
+    TUnit.assertThat(() -> {
+      int total = 0;
+      for (int i = 0; i < clusterMembers; ++i) {
+        total += clients.get(i).getLiveMembers().size();
+      }
+      return total;
+    }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2);
+    
+    clients.get(1).registerSharedDataSubscriber((key, oldValue, newValue) -> {
+      receivedKey = key;
+      receivingNodeDataOldValue = oldValue;
+      receivingNodeDataNewValue = newValue;
+      lock.release();
+    });
+    
+    // Add initial gCounter to Node 1
+    SharedDataMessage d = new SharedDataMessage();
+    d.setKey(gCounterKey);
+    d.setPayload(new GrowOnlyCounter(new GrowOnlyCounter.Builder(clients.get(0)).increment(1L)));
+    d.setExpireAt(Long.MAX_VALUE);
+    d.setTimestamp(System.currentTimeMillis());
+    clients.get(0).merge(d);
+    
+    // Check if initial Crdt received
+    lock.tryAcquire(10, TimeUnit.SECONDS);
+    Assert.assertEquals("gCounter", receivedKey);
+    Assert.assertEquals(null, receivingNodeDataOldValue);
+    Assert.assertTrue(receivingNodeDataNewValue instanceof GrowOnlyCounter);
+    Assert.assertEquals(1, ((GrowOnlyCounter) receivingNodeDataNewValue).value().longValue());
+  
+    // Node 3 Updates the gCounter by 4
+    GrowOnlyCounter gc = (GrowOnlyCounter) clients.get(2).findCrdt(gCounterKey);
+    GrowOnlyCounter gcNew = new GrowOnlyCounter(gc,
+            new GrowOnlyCounter.Builder(clients.get(2)).increment(4L));
+  
+    d = new SharedDataMessage();
+    d.setKey(gCounterKey);
+    d.setPayload(gcNew);
+    d.setExpireAt(Long.MAX_VALUE);
+    d.setTimestamp(System.currentTimeMillis());
+    clients.get(2).merge(d);
+  
+    // Check if Node 3's Crdt update is received in Node 2 event handler
+    lock.tryAcquire(10, TimeUnit.SECONDS);
+    Assert.assertEquals("gCounter", receivedKey);
+    Assert.assertTrue(receivingNodeDataOldValue instanceof GrowOnlyCounter);
+    Assert.assertEquals(1, ((GrowOnlyCounter) receivingNodeDataOldValue).value().longValue());
+    Assert.assertTrue(receivingNodeDataNewValue instanceof GrowOnlyCounter);
+    Assert.assertEquals(5, ((GrowOnlyCounter) receivingNodeDataNewValue).value().longValue());
+    
+  }
+  
+  private SharedDataMessage sharedNodeData(String key, String value) {
+    SharedDataMessage g = new SharedDataMessage();
+    g.setExpireAt(Long.MAX_VALUE);
+    g.setKey(key);
+    g.setPayload(value);
+    g.setTimestamp(System.currentTimeMillis());
+    return g;
+  }
+  
+}