Merge branch 'master' of https://github.com/pxsalehi/incubator-gossip
diff --git a/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java b/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java
index ab5cefa..396ec03 100644
--- a/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java
+++ b/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java
@@ -20,9 +20,15 @@
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
 import com.fasterxml.jackson.core.Version;
 import com.fasterxml.jackson.databind.module.SimpleModule;
+import org.apache.gossip.LocalMember;
+import org.apache.gossip.replication.BlackListReplicable;
+import org.apache.gossip.replication.Replicable;
+import org.apache.gossip.replication.WhiteListReplicable;
 
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
@@ -81,6 +87,26 @@
   @JsonProperty("n-counters") abstract Map<String, Long> getNCounters();
 }
 
+@JsonTypeInfo(
+        use = JsonTypeInfo.Id.CLASS,
+        include = JsonTypeInfo.As.PROPERTY,
+        property = "type")
+abstract class ReplicableMixin {
+
+}
+
+abstract class WhiteListReplicableMixin {
+  @JsonCreator
+  WhiteListReplicableMixin(@JsonProperty("whiteListMembers") List<LocalMember> whiteListMembers) { }
+  @JsonProperty("whiteListMembers") abstract List<LocalMember> getWhiteListMembers();
+}
+
+abstract class BlackListReplicableMixin {
+  @JsonCreator
+  BlackListReplicableMixin(@JsonProperty("blackListMembers") List<LocalMember> blackListMembers) { }
+  @JsonProperty("blackListMembers") abstract List<LocalMember> getBlackListMembers();
+}
+
 //If anyone wants to take a stab at this. please have at it
 //https://github.com/FasterXML/jackson-datatype-guava/blob/master/src/main/java/com/fasterxml/jackson/datatype/guava/ser/MultimapSerializer.java
 public class CrdtModule extends SimpleModule {
@@ -101,6 +127,9 @@
     context.setMixInAnnotations(LwwSet.Timestamps.class, LWWSetTimestampsMixin.class);
     context.setMixInAnnotations(MaxChangeSet.class, MaxChangeSetMixin.class);
     context.setMixInAnnotations(TwoPhaseSet.class, TwoPhaseSetMixin.class);
+    context.setMixInAnnotations(Replicable.class, ReplicableMixin.class);
+    context.setMixInAnnotations(WhiteListReplicable.class, WhiteListReplicableMixin.class);
+    context.setMixInAnnotations(BlackListReplicable.class, BlackListReplicableMixin.class);
   }
 
 }
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java b/gossip-base/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java
index b73550e..adf2530 100644
--- a/gossip-base/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java
@@ -85,6 +85,10 @@
     }
     long startTime = System.currentTimeMillis();
     for (Entry<String, SharedDataMessage> innerEntry : gossipCore.getSharedData().entrySet()){
+      if (innerEntry.getValue().getReplicable() != null && !innerEntry.getValue().getReplicable()
+              .shouldReplicate(me, member, innerEntry.getValue())) {
+        continue;
+      }
       UdpSharedDataMessage message = new UdpSharedDataMessage();
       message.setUuid(UUID.randomUUID().toString());
       message.setUriFrom(me.getId());
@@ -93,6 +97,7 @@
       message.setNodeId(innerEntry.getValue().getNodeId());
       message.setTimestamp(innerEntry.getValue().getTimestamp());
       message.setPayload(innerEntry.getValue().getPayload());
+      message.setReplicable(innerEntry.getValue().getReplicable());
       gossipCore.sendOneWay(message, member.getUri());
     }
     sharedDataHistogram.update(System.currentTimeMillis() - startTime);
@@ -105,6 +110,10 @@
     long startTime = System.currentTimeMillis();
     for (Entry<String, ConcurrentHashMap<String, PerNodeDataMessage>> entry : gossipCore.getPerNodeData().entrySet()){
       for (Entry<String, PerNodeDataMessage> innerEntry : entry.getValue().entrySet()){
+        if (innerEntry.getValue().getReplicable() != null && !innerEntry.getValue().getReplicable()
+                .shouldReplicate(me, member, innerEntry.getValue())) {
+          continue;
+        }
         UdpPerNodeDataMessage message = new UdpPerNodeDataMessage();
         message.setUuid(UUID.randomUUID().toString());
         message.setUriFrom(me.getId());
@@ -113,6 +122,7 @@
         message.setNodeId(innerEntry.getValue().getNodeId());
         message.setTimestamp(innerEntry.getValue().getTimestamp());
         message.setPayload(innerEntry.getValue().getPayload());
+        message.setReplicable(innerEntry.getValue().getReplicable());
         gossipCore.sendOneWay(message, member.getUri());   
       }
     }
diff --git a/gossip-base/src/main/java/org/apache/gossip/model/PerNodeDataMessage.java b/gossip-base/src/main/java/org/apache/gossip/model/PerNodeDataMessage.java
index 2d1cdef..2394e76 100644
--- a/gossip-base/src/main/java/org/apache/gossip/model/PerNodeDataMessage.java
+++ b/gossip-base/src/main/java/org/apache/gossip/model/PerNodeDataMessage.java
@@ -17,6 +17,8 @@
  */
 package org.apache.gossip.model;
 
+import org.apache.gossip.replication.Replicable;
+
 public class PerNodeDataMessage extends Base {
 
   private String nodeId;
@@ -24,7 +26,8 @@
   private Object payload;
   private Long timestamp;
   private Long expireAt;
-  
+  private Replicable<PerNodeDataMessage> replicable;
+
   public String getNodeId() {
     return nodeId;
   }
@@ -55,10 +58,20 @@
   public void setExpireAt(Long expireAt) {
     this.expireAt = expireAt;
   }
+
+  public Replicable<PerNodeDataMessage> getReplicable() {
+    return replicable;
+  }
+
+  public void setReplicable(Replicable<PerNodeDataMessage> replicable) {
+    this.replicable = replicable;
+  }
+
   @Override
   public String toString() {
     return "GossipDataMessage [nodeId=" + nodeId + ", key=" + key + ", payload=" + payload
-            + ", timestamp=" + timestamp + ", expireAt=" + expireAt + "]";
+            + ", timestamp=" + timestamp + ", expireAt=" + expireAt
+            + ", replicable=" + replicable + "]";
   }
 
   
diff --git a/gossip-base/src/main/java/org/apache/gossip/model/SharedDataMessage.java b/gossip-base/src/main/java/org/apache/gossip/model/SharedDataMessage.java
index e423be8..4b1a1ea 100644
--- a/gossip-base/src/main/java/org/apache/gossip/model/SharedDataMessage.java
+++ b/gossip-base/src/main/java/org/apache/gossip/model/SharedDataMessage.java
@@ -17,6 +17,9 @@
  */
 package org.apache.gossip.model;
 
+import org.apache.gossip.replication.AllReplicable;
+import org.apache.gossip.replication.Replicable;
+
 public class SharedDataMessage extends Base {
 
   private String nodeId;
@@ -24,6 +27,7 @@
   private Object payload;
   private Long timestamp;
   private Long expireAt;
+  private Replicable<SharedDataMessage> replicable;
   
   public String getNodeId() {
     return nodeId;
@@ -55,10 +59,20 @@
   public void setExpireAt(Long expireAt) {
     this.expireAt = expireAt;
   }
+  
+  public Replicable<SharedDataMessage> getReplicable() {
+    return replicable;
+  }
+  
+  public void setReplicable(Replicable<SharedDataMessage> replicable) {
+    this.replicable = replicable;
+  }
+  
   @Override
   public String toString() {
     return "SharedGossipDataMessage [nodeId=" + nodeId + ", key=" + key + ", payload=" + payload
-            + ", timestamp=" + timestamp + ", expireAt=" + expireAt + "]";
-  }  
+            + ", timestamp=" + timestamp + ", expireAt=" + expireAt
+            + ", replicable=" + replicable + "]";
+  }
 }
 
diff --git a/gossip-base/src/main/java/org/apache/gossip/replication/AllReplicable.java b/gossip-base/src/main/java/org/apache/gossip/replication/AllReplicable.java
new file mode 100644
index 0000000..573fd25
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/replication/AllReplicable.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.replication;
+
+import org.apache.gossip.LocalMember;
+import org.apache.gossip.model.Base;
+
+/**
+ * Replicable implementation which replicates data to any node. This is the default replication
+ * strategy if a data item not specified its replication behaviour.
+ *
+ * @param <T> A subtype of the class {@link org.apache.gossip.model.Base} which uses this interface
+ * @see Replicable
+ */
+public class AllReplicable<T extends Base> implements Replicable<T> {
+  
+  @Override
+  public boolean shouldReplicate(LocalMember me, LocalMember destination, T message) {
+    return true;
+  }
+}
diff --git a/gossip-base/src/main/java/org/apache/gossip/replication/BlackListReplicable.java b/gossip-base/src/main/java/org/apache/gossip/replication/BlackListReplicable.java
new file mode 100644
index 0000000..33e1706
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/replication/BlackListReplicable.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.replication;
+
+import org.apache.gossip.LocalMember;
+import org.apache.gossip.model.Base;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Replicable implementation which does not replicate data to given set of nodes.
+ *
+ * @param <T> A subtype of the class {@link org.apache.gossip.model.Base} which uses this interface
+ * @see Replicable
+ */
+public class BlackListReplicable<T extends Base> implements Replicable<T> {
+  
+  private final List<LocalMember> blackListMembers;
+  
+  public BlackListReplicable(List<LocalMember> blackListMembers) {
+    if (blackListMembers == null) {
+      this.blackListMembers = new ArrayList<>();
+    } else {
+      this.blackListMembers = blackListMembers;
+    }
+  }
+
+  public List<LocalMember> getBlackListMembers() {
+    return blackListMembers;
+  }
+
+  @Override
+  public boolean shouldReplicate(LocalMember me, LocalMember destination, T message) {
+    return !blackListMembers.contains(destination);
+  }
+}
diff --git a/gossip-base/src/main/java/org/apache/gossip/replication/DataCenterReplicable.java b/gossip-base/src/main/java/org/apache/gossip/replication/DataCenterReplicable.java
new file mode 100644
index 0000000..1067c49
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/replication/DataCenterReplicable.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.replication;
+
+import org.apache.gossip.LocalMember;
+import org.apache.gossip.manager.DatacenterRackAwareActiveGossiper;
+import org.apache.gossip.model.Base;
+
+/**
+ * Replicable implementation which does replicate data only in the same data center.
+ *
+ * @param <T> A subtype of the class {@link org.apache.gossip.model.Base} which uses this interface
+ * @see Replicable
+ */
+public class DataCenterReplicable<T extends Base> implements Replicable<T> {
+  
+  @Override
+  public boolean shouldReplicate(LocalMember me, LocalMember destination, T message) {
+    if (!me.getProperties().containsKey(DatacenterRackAwareActiveGossiper.DATACENTER)) {
+      // replicate to others if I am not belong to any data center
+      return true;
+    } else if (!destination.getProperties()
+            .containsKey(DatacenterRackAwareActiveGossiper.DATACENTER)) {
+      // Do not replicate if the destination data center is not defined
+      return false;
+    } else {
+      return me.getProperties().get(DatacenterRackAwareActiveGossiper.DATACENTER)
+              .equals(destination.getProperties().get(DatacenterRackAwareActiveGossiper.DATACENTER));
+    }
+  }
+}
diff --git a/gossip-base/src/main/java/org/apache/gossip/replication/NotReplicable.java b/gossip-base/src/main/java/org/apache/gossip/replication/NotReplicable.java
new file mode 100644
index 0000000..c3fa538
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/replication/NotReplicable.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.replication;
+
+import org.apache.gossip.LocalMember;
+import org.apache.gossip.model.Base;
+
+/**
+ * Replicable implementation which never replicates data on any node
+ *
+ * @param <T> A subtype of the class {@link org.apache.gossip.model.Base} which uses this interface
+ * @see Replicable
+ */
+public class NotReplicable<T extends Base> implements Replicable<T> {
+  
+  @Override
+  public boolean shouldReplicate(LocalMember me, LocalMember destination, T message) {
+    return false;
+  }
+}
diff --git a/gossip-base/src/main/java/org/apache/gossip/replication/Replicable.java b/gossip-base/src/main/java/org/apache/gossip/replication/Replicable.java
new file mode 100644
index 0000000..68098df
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/replication/Replicable.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.replication;
+
+import org.apache.gossip.LocalMember;
+import org.apache.gossip.model.Base;
+
+/**
+ * This interface is used to determine whether a data item needs to be replicated to
+ * another gossip member.
+ *
+ * @param <T> A subtype of the class {@link org.apache.gossip.model.Base} which uses this interface
+ */
+public interface Replicable<T extends Base> {
+  /**
+   * Test for a given data item needs to be replicated.
+   * @param me node that the data item is going to transmit from.
+   * @param destination target node to replicate.
+   * @param message this parameter is currently ignored
+   * @return true if the data item needs to be replicated to the destination. Otherwise false.
+   */
+  boolean shouldReplicate(LocalMember me, LocalMember destination, T message);
+}
diff --git a/gossip-base/src/main/java/org/apache/gossip/replication/WhiteListReplicable.java b/gossip-base/src/main/java/org/apache/gossip/replication/WhiteListReplicable.java
new file mode 100644
index 0000000..299d929
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/replication/WhiteListReplicable.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.replication;
+
+import org.apache.gossip.LocalMember;
+import org.apache.gossip.model.Base;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Replicable implementation which replicates data to given set of nodes.
+ *
+ * @param <T> A subtype of the class {@link org.apache.gossip.model.Base} which uses this interface
+ * @see Replicable
+ */
+public class WhiteListReplicable<T extends Base> implements Replicable<T> {
+  
+  private final List<LocalMember> whiteListMembers;
+  
+  public WhiteListReplicable(List<LocalMember> whiteListMembers) {
+    if (whiteListMembers == null) {
+      this.whiteListMembers = new ArrayList<>();
+    } else {
+      this.whiteListMembers = whiteListMembers;
+    }
+  }
+
+  public List<LocalMember> getWhiteListMembers() {
+    return whiteListMembers;
+  }
+
+  @Override
+  public boolean shouldReplicate(LocalMember me, LocalMember destination, T message) {
+    return whiteListMembers.contains(destination);
+  }
+}
diff --git a/gossip-base/src/main/java/org/apache/gossip/udp/UdpPerNodeDataMessage.java b/gossip-base/src/main/java/org/apache/gossip/udp/UdpPerNodeDataMessage.java
index 6eb170a..9ba1e85 100644
--- a/gossip-base/src/main/java/org/apache/gossip/udp/UdpPerNodeDataMessage.java
+++ b/gossip-base/src/main/java/org/apache/gossip/udp/UdpPerNodeDataMessage.java
@@ -42,7 +42,8 @@
 
   @Override
   public String toString() {
-    return "UdpGossipDataMessage [uriFrom=" + uriFrom + ", uuid=" + uuid + "]";
+    return "UdpGossipDataMessage [uriFrom=" + uriFrom + ", uuid=" + uuid
+            + ", getReplicable()=" + getReplicable() + "]";
   }
 
 }
diff --git a/gossip-base/src/main/java/org/apache/gossip/udp/UdpSharedDataMessage.java b/gossip-base/src/main/java/org/apache/gossip/udp/UdpSharedDataMessage.java
index 1658503..0059bdb 100644
--- a/gossip-base/src/main/java/org/apache/gossip/udp/UdpSharedDataMessage.java
+++ b/gossip-base/src/main/java/org/apache/gossip/udp/UdpSharedDataMessage.java
@@ -44,7 +44,8 @@
   public String toString() {
     return "UdpSharedGossipDataMessage [uriFrom=" + uriFrom + ", uuid=" + uuid + ", getNodeId()="
             + getNodeId() + ", getKey()=" + getKey() + ", getPayload()=" + getPayload()
-            + ", getTimestamp()=" + getTimestamp() + ", getExpireAt()=" + getExpireAt() + "]";
+            + ", getTimestamp()=" + getTimestamp() + ", getExpireAt()=" + getExpireAt()
+            + ", getReplicable()=" + getReplicable() + "]";
   }
 
 }
diff --git a/gossip-base/src/test/java/org/apache/gossip/AbstractIntegrationBase.java b/gossip-base/src/test/java/org/apache/gossip/AbstractIntegrationBase.java
index 896157f..d074706 100644
--- a/gossip-base/src/test/java/org/apache/gossip/AbstractIntegrationBase.java
+++ b/gossip-base/src/test/java/org/apache/gossip/AbstractIntegrationBase.java
@@ -18,24 +18,52 @@
 
 package org.apache.gossip;
 
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.UUID;
 
 import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.manager.GossipManagerBuilder;
 import org.junit.After;
 import org.junit.Before;
 
 public abstract class AbstractIntegrationBase {
 
-  List <GossipManager> nodes = new ArrayList<GossipManager>();
+  List <GossipManager> nodes = new ArrayList<>();
   
   public void register(GossipManager manager){
     nodes.add(manager);
   }
-    
+
+  public void generateStandardNodes(final int memberCount) throws URISyntaxException {
+    if(nodes.size() > 0){
+      after();
+      nodes.clear();
+    }
+    GossipSettings settings = new GossipSettings();
+    settings.setPersistRingState(false);
+    settings.setPersistDataState(false);
+    String cluster = UUID.randomUUID().toString();
+    int seedNodes = 1;
+    List<Member> startupMembers = new ArrayList<>();
+    for (int i = 1; i < seedNodes + 1; ++i) {
+      URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+      startupMembers.add(new RemoteMember(cluster, uri, i + ""));
+    }
+
+    for (int i = 1; i < memberCount + 1; ++i) {
+      URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+      GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri)
+              .id(i + "").gossipMembers(startupMembers).gossipSettings(settings).build();
+      gossipService.init();
+      register(gossipService);
+    }
+  }
   @Before
   public void before(){
-    nodes = new ArrayList<GossipManager>();
+    nodes = new ArrayList<>();
   }
   
   @After
diff --git a/gossip-base/src/test/java/org/apache/gossip/replication/DataReplicationTest.java b/gossip-base/src/test/java/org/apache/gossip/replication/DataReplicationTest.java
new file mode 100644
index 0000000..dd073a8
--- /dev/null
+++ b/gossip-base/src/test/java/org/apache/gossip/replication/DataReplicationTest.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.replication;
+
+import org.apache.gossip.LocalMember;
+import org.apache.gossip.manager.DatacenterRackAwareActiveGossiper;
+import org.apache.gossip.model.SharedDataMessage;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.platform.runner.JUnitPlatform;
+import org.junit.runner.RunWith;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@RunWith(JUnitPlatform.class)
+public class DataReplicationTest {
+  
+  @Test
+  public void dataReplicateAllTest() throws URISyntaxException {
+    SharedDataMessage message =  getSharedNodeData("public","public", new AllReplicable<>());
+    LocalMember me = getLocalMember(new URI("udp://127.0.0.1:8001"),"1");
+    LocalMember member = getLocalMember(new URI("udp://127.0.0.1:8002"),"2");
+    Assert.assertEquals(true, message.getReplicable().shouldReplicate(me, member, message));
+  }
+  
+  @Test
+  public void dataReplicateNoneTest() throws URISyntaxException {
+    SharedDataMessage message =  getSharedNodeData("private","private", new NotReplicable<>());
+    LocalMember me = getLocalMember(new URI("udp://127.0.0.1:8001"),"1");
+    LocalMember member = getLocalMember(new URI("udp://127.0.0.1:8002"),"2");
+    Assert.assertEquals(false, message.getReplicable().shouldReplicate(me, member, message));
+  }
+  
+  @Test
+  public void dataReplicateWhiteListTest() throws URISyntaxException {
+    List<LocalMember> memberList = new ArrayList<>();
+    memberList.add(getLocalMember(new URI("udp://127.0.0.1:8001"),"1"));
+    memberList.add(getLocalMember(new URI("udp://127.0.0.1:8002"),"2"));
+    memberList.add(getLocalMember(new URI("udp://127.0.0.1:8003"),"3"));
+    // add node 1 and 2 to the white list
+    List<LocalMember> whiteList = new ArrayList<>();
+    whiteList.add(memberList.get(0));
+    whiteList.add(memberList.get(1));
+
+    SharedDataMessage message = getSharedNodeData("whiteList", "Only allow some nodes",
+            new WhiteListReplicable<>(whiteList));
+    LocalMember me =  getLocalMember(new URI("udp://127.0.0.1:8004"),"4");
+    
+    // data should replicate to node 1 and 2 but not 3
+    Assert.assertEquals(true,
+            message.getReplicable().shouldReplicate(me, memberList.get(0), message));
+    Assert.assertEquals(true,
+            message.getReplicable().shouldReplicate(me, memberList.get(1), message));
+    Assert.assertEquals(false,
+            message.getReplicable().shouldReplicate(me, memberList.get(2), message));
+  }
+  
+  @Test
+  public void dataReplicateWhiteListNullTest() throws URISyntaxException {
+    List<LocalMember> memberList = new ArrayList<>();
+    memberList.add(getLocalMember(new URI("udp://127.0.0.1:8001"),"1"));
+    memberList.add(getLocalMember(new URI("udp://127.0.0.1:8002"),"2"));
+
+    SharedDataMessage message = getSharedNodeData("whiteList", "Only allow some nodes",
+            new WhiteListReplicable<>(null));
+
+    // data should not replicate if no whitelist specified
+    Assert.assertEquals(false,
+            message.getReplicable().shouldReplicate(memberList.get(0), memberList.get(1), message));
+    Assert.assertEquals(false,
+            message.getReplicable().shouldReplicate(memberList.get(1), memberList.get(0), message));
+
+  }
+  
+  @Test
+  public void dataReplicateBlackListTest() throws URISyntaxException {
+    List<LocalMember> memberList = new ArrayList<>();
+    memberList.add(getLocalMember(new URI("udp://127.0.0.1:8001"),"1"));
+    memberList.add(getLocalMember(new URI("udp://127.0.0.1:8002"),"2"));
+    memberList.add(getLocalMember(new URI("udp://127.0.0.1:8003"),"3"));
+    // add node 1 and 2 to the black list
+    List<LocalMember> blackList = new ArrayList<>();
+    blackList.add(memberList.get(0));
+    blackList.add(memberList.get(1));
+    
+    SharedDataMessage message = getSharedNodeData("blackList", "Disallow some nodes",
+            new BlackListReplicable<>(blackList));
+    LocalMember me = getLocalMember(new URI("udp://127.0.0.1:8004"),"4");
+
+    // data should not replicate to node 1 and 2
+    Assert.assertEquals(false,
+            message.getReplicable().shouldReplicate(me, memberList.get(0), message));
+    Assert.assertEquals(false,
+            message.getReplicable().shouldReplicate(me, memberList.get(1), message));
+    Assert.assertEquals(true,
+            message.getReplicable().shouldReplicate(me, memberList.get(2), message));
+  }
+  
+  @Test
+  public void dataReplicateBlackListNullTest() throws URISyntaxException {
+    
+    List<LocalMember> memberList = new ArrayList<>();
+    memberList.add(getLocalMember(new URI("udp://127.0.0.1:8001"),"1"));
+    memberList.add(getLocalMember(new URI("udp://127.0.0.1:8002"),"2"));
+    
+    SharedDataMessage message = getSharedNodeData("blackList", "Disallow some nodes",
+            new BlackListReplicable<>(null));
+    
+    // data should replicate if no blacklist specified
+    Assert.assertEquals(true,
+            message.getReplicable().shouldReplicate(memberList.get(0), memberList.get(1), message));
+    Assert.assertEquals(true,
+            message.getReplicable().shouldReplicate(memberList.get(1), memberList.get(0), message));
+  }
+  
+  @Test
+  public void dataReplicateDataCenterTest() throws URISyntaxException {
+    
+    List<LocalMember> memberListDc1 = new ArrayList<>();
+    List<LocalMember> memberListDc2 = new ArrayList<>();
+
+    memberListDc1
+            .add(getLocalMemberDc(new URI("udp://10.0.0.1:8000"), "1", "DataCenter1", "Rack1"));
+    memberListDc1
+            .add(getLocalMemberDc(new URI("udp://10.0.0.2:8000"), "2", "DataCenter1", "Rack2"));
+    memberListDc2
+            .add(getLocalMemberDc(new URI("udp://10.0.1.1:8000"), "11", "DataCenter2", "Rack1"));
+    memberListDc2
+            .add(getLocalMemberDc(new URI("udp://10.0.1.2:8000"), "12", "DataCenter2", "Rack2"));
+
+    SharedDataMessage message = getSharedNodeData("datacenter1", "I am in data center 1 rack 1",
+            new DataCenterReplicable<>());
+
+    // data should replicate in data center 1
+    Assert.assertEquals(true, message.getReplicable()
+            .shouldReplicate(memberListDc1.get(0), memberListDc1.get(1), message));
+    Assert.assertEquals(true, message.getReplicable()
+            .shouldReplicate(memberListDc2.get(0), memberListDc2.get(1), message));
+    
+    // data should not replicate to data center 2
+    Assert.assertEquals(false, message.getReplicable()
+            .shouldReplicate(memberListDc1.get(0), memberListDc2.get(0), message));
+    Assert.assertEquals(false, message.getReplicable()
+            .shouldReplicate(memberListDc1.get(1), memberListDc2.get(1), message));
+  }
+  
+  @Test
+  public void dataReplicateDataCenterUnknownDataCenterTest() throws URISyntaxException {
+    
+    List<LocalMember> memberListDc1 = new ArrayList<>();
+    memberListDc1
+            .add(getLocalMemberDc(new URI("udp://10.0.0.1:8000"), "1", "DataCenter1", "Rack1"));
+
+    Map<String, String> properties = new HashMap<>();
+    LocalMember unknownDc = new LocalMember("cluster1", new URI("udp://10.0.1.2:8000"), "12", 0,
+            properties, 1, 0, "");
+    
+    SharedDataMessage message = getSharedNodeData("datacenter1","I am in data center 1 rack 1", new DataCenterReplicable<>());
+
+    // data should not replicate from dc1 to unknown node
+    Assert.assertEquals(false, message.getReplicable()
+            .shouldReplicate(memberListDc1.get(0), unknownDc, message));
+    // data can replicate from unknown node to dc
+    Assert.assertEquals(true, message.getReplicable()
+            .shouldReplicate(unknownDc, memberListDc1.get(0), message));
+
+  }
+
+  private static SharedDataMessage getSharedNodeData(String key, String value,
+          Replicable<SharedDataMessage> replicable) {
+    SharedDataMessage g = new SharedDataMessage();
+    g.setExpireAt(Long.MAX_VALUE);
+    g.setKey(key);
+    g.setPayload(value);
+    g.setTimestamp(System.currentTimeMillis());
+    g.setReplicable(replicable);
+    return g;
+  }
+
+  private static LocalMember getLocalMember(URI uri, String id){
+    return new LocalMember("cluster1", uri, id, 0, null, 1, 0, "");
+  }
+
+  private static LocalMember getLocalMemberDc(URI uri, String id, String dataCenter, String rack){
+    Map<String, String> props = new HashMap<>();
+    props.put(DatacenterRackAwareActiveGossiper.DATACENTER, dataCenter);
+    props.put(DatacenterRackAwareActiveGossiper.RACK, rack);
+    return new LocalMember("cluster1", uri, id, 0, props, 1, 0, "");
+  }
+}
diff --git a/gossip-examples/src/main/java/org/apache/gossip/examples/ExampleCommon.java b/gossip-examples/src/main/java/org/apache/gossip/examples/ExampleCommon.java
deleted file mode 100644
index 279bff1..0000000
--- a/gossip-examples/src/main/java/org/apache/gossip/examples/ExampleCommon.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gossip.examples;
-
-public class ExampleCommon {
-
-  private boolean clearTerminalScreen = true;
-
-  /*
-   * Look for -s in args. If there, suppress terminal-clear on write results Shift args for
-   * positional args, if necessary
-   */
-  public String[] checkArgsForClearFlag(String[] args) {
-    int pos = 0;
-    for (int i = 0; i < args.length; i++) {
-      if (args[i].equals("-s")) {
-        clearTerminalScreen = false;
-      } else {
-        // in the case of the -s flag, shift args 
-        // down by one slot; this will end up with
-        // a duplicate entry in the last position of args,
-        // but this is ok, because it will be ignored
-        args[pos++] = args[i];
-      }
-    }
-    return args;
-  }
-
-  public void optionallyClearTerminal() {
-    if (clearTerminalScreen) {
-      System.out.print("\033[H\033[2J");
-      System.out.flush();
-    }
-  }
-}
diff --git a/gossip-examples/src/main/java/org/apache/gossip/examples/RunStandardExamples.java b/gossip-examples/src/main/java/org/apache/gossip/examples/RunStandardExamples.java
new file mode 100644
index 0000000..21861bb
--- /dev/null
+++ b/gossip-examples/src/main/java/org/apache/gossip/examples/RunStandardExamples.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.examples;
+
+import java.io.IOException;
+
+public class RunStandardExamples {
+
+  private static boolean WILL_READ = true;
+
+  private static boolean WILL_NOT_READ = false;
+
+  public static void main(String[] args) {
+    if ((args.length < 1) || args[0].equals("-h") || args[0].equals("--help") || args.length < 2) {
+      System.out.print(usage());
+      return;
+    }
+    try {
+      int example = intFromString(args[0]);
+      int channel = intFromString(args[1]);
+      if ((example < 1) || (example > 4) || (channel < 0) || (channel > 2)) {
+        System.out.print(usage());
+        return;
+      }
+      runExaple(example, channel);
+    } catch (Exception e) {
+      System.out.print(usage());
+    }
+  }
+
+  private static void runExaple(int exampleNumber, int channel) throws IOException {
+    String[] args = stanardArgs(channel, new String[4]);
+    if (exampleNumber == 1) {
+      StandAloneNode example = new StandAloneNode(args);
+      example.exec(WILL_NOT_READ);
+    } else if (exampleNumber == 2) {
+      StandAloneNodeCrdtOrSet example = new StandAloneNodeCrdtOrSet(args);
+      example.exec(WILL_READ);
+    } else if (exampleNumber == 3) {
+      StandAlonePNCounter example = new StandAlonePNCounter(args);
+      example.exec(WILL_READ);
+    } else if (exampleNumber == 4) {
+      args = extendedArgs(channel, new String[6]);
+      StandAloneDatacenterAndRack example = new StandAloneDatacenterAndRack(args);
+      example.exec(WILL_READ);
+    }
+  }
+
+  private static String[] stanardArgs(int channel, String[] args) {
+    // see README.md for examples
+    args[0] = "udp://localhost:1000" + channel;
+    args[1] = "" + channel;
+    args[2] = "udp://localhost:10000";
+    args[3] = "0";
+    return args;
+  }
+
+  private static String[] extendedArgs(int channel, String[] args) {
+    args = stanardArgs(channel, args);
+    // see README.md for examples
+    if (channel == 0) {
+      args[4] = "1";
+      args[5] = "2";
+    }
+    if (channel == 1) {
+      args[4] = "1";
+      args[5] = "3";
+    }
+    if (channel == 2) {
+      args[4] = "2";
+      args[5] = "2";
+    }
+    return args;
+  }
+
+  private static int intFromString(String string) {
+    return Integer.parseInt(string);
+  }
+
+  private static String usage() {
+    return "Select and run (usually in a seperate terminal window) \n"
+            + "one of the the standard Examples,\n" + " 1. StandAloneNode\n"
+            + " 2. StandAloneNodeCrdtOrSet\n" + " 3. StandAlonePNCounter\n"
+            + " 4. StandAloneDatacenterAndRack\n" + "(See README.md in this modules)\n" + "\n"
+            + "Usage: mvn exec:java -Dexec.mainClass=org.apache.gossip.examples.RunStandardExamples  -Dexec.args=\"s c\"\n"
+            + "where...\n" + "  s - int - the example number from above\n"
+            + "  c - int - the channel number: 0, 1, or 2\n";
+  }
+
+}
diff --git a/gossip-examples/src/main/java/org/apache/gossip/examples/StandAloneDatacenterAndRack.java b/gossip-examples/src/main/java/org/apache/gossip/examples/StandAloneDatacenterAndRack.java
index 1a2643c..2336e87 100644
--- a/gossip-examples/src/main/java/org/apache/gossip/examples/StandAloneDatacenterAndRack.java
+++ b/gossip-examples/src/main/java/org/apache/gossip/examples/StandAloneDatacenterAndRack.java
@@ -18,8 +18,8 @@
 
 package org.apache.gossip.examples;
 
+import java.io.IOException;
 import java.net.URI;
-import java.net.UnknownHostException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
@@ -30,12 +30,20 @@
 import org.apache.gossip.manager.GossipManager;
 import org.apache.gossip.manager.GossipManagerBuilder;
 
-public class StandAloneDatacenterAndRack {
+public class StandAloneDatacenterAndRack extends StandAloneExampleBase {
 
-  private static ExampleCommon common = new ExampleCommon();
+  public static void main(String[] args) throws InterruptedException, IOException {
+    StandAloneDatacenterAndRack example = new StandAloneDatacenterAndRack(args);
+    boolean willRead = true;
+    example.exec(willRead);
+  }
 
-  public static void main(String[] args) throws UnknownHostException, InterruptedException {
-    args = common.checkArgsForClearFlag(args);
+  StandAloneDatacenterAndRack(String[] args) {
+    args = super.checkArgsForClearFlag(args);
+    initGossipManager(args);
+  }
+
+  void initGossipManager(String[] args) {
     GossipSettings s = new GossipSettings();
     s.setWindowSize(1000);
     s.setGossipInterval(100);
@@ -48,20 +56,17 @@
     props.put(DatacenterRackAwareActiveGossiper.DATACENTER, args[4]);
     props.put(DatacenterRackAwareActiveGossiper.RACK, args[5]);
     GossipManager manager = GossipManagerBuilder.newBuilder().cluster("mycluster")
-            .uri(URI.create(args[0]))
-            .id(args[1])
-            .gossipSettings(s)
+            .uri(URI.create(args[0])).id(args[1]).gossipSettings(s)
             .gossipMembers(
                     Arrays.asList(new RemoteMember("mycluster", URI.create(args[2]), args[3])))
-            .properties(props)
-            .build();
+            .properties(props).build();
     manager.init();
-    while (true) {
-      common.optionallyClearTerminal();
-      System.out.println("Live: " + manager.getLiveMembers());
-      System.out.println("Dead: " + manager.getDeadMembers());
-      Thread.sleep(2000);
-    }
+    setGossipService(manager);
+  }
+
+  @Override
+  void printValues(GossipManager gossipService) {
+    return;
   }
 
 }
diff --git a/gossip-examples/src/main/java/org/apache/gossip/examples/StandAloneExampleBase.java b/gossip-examples/src/main/java/org/apache/gossip/examples/StandAloneExampleBase.java
new file mode 100644
index 0000000..02c2ee7
--- /dev/null
+++ b/gossip-examples/src/main/java/org/apache/gossip/examples/StandAloneExampleBase.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.examples;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.URI;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.gossip.GossipSettings;
+import org.apache.gossip.LocalMember;
+import org.apache.gossip.RemoteMember;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.manager.GossipManagerBuilder;
+
+abstract class StandAloneExampleBase {
+  private String lastInput = "{none}";
+
+  private boolean clearTerminalScreen = true;
+
+  private GossipManager gossipService = null;
+
+  abstract void printValues(GossipManager gossipService);
+
+  boolean processReadLoopInput(String line) {
+    return true;
+  }
+
+  void exec(boolean willRead) throws IOException {
+    gossipService.init();
+    startMonitorLoop(gossipService);
+    if (willRead) {
+      startBlockingReadLoop();
+    }
+  }
+
+  /*
+   * Look for -s in args. If there, suppress terminal-clear on write results: shift args for
+   * positional args, if necessary
+   */
+  String[] checkArgsForClearFlag(String[] args) {
+    int pos = 0;
+    for (int i = 0; i < args.length; i++) {
+      if (args[i].equals("-s")) {
+        clearTerminalScreen = false;
+      } else {
+        // in the case of the -s flag, shift args
+        // down by one slot; this will end up with
+        // a duplicate entry in the last position of args,
+        // but this is ok, because it will be ignored
+        args[pos++] = args[i];
+      }
+    }
+    return args;
+  }
+
+  private void optionallyClearTerminal() {
+    if (clearTerminalScreen) {
+      System.out.print("\033[H\033[2J");
+      System.out.flush();
+    }
+  }
+
+  private void setLastInput(String input, boolean valid) {
+    lastInput = input;
+    if (!valid) {
+      lastInput += " (invalid)";
+    }
+  }
+
+  String getLastInput() {
+    return lastInput;
+  }
+
+  private void startMonitorLoop(GossipManager gossipService) {
+    new Thread(() -> {
+      while (true) {
+        optionallyClearTerminal();
+        printLiveMembers(gossipService);
+        printDeadMambers(gossipService);
+        printValues(gossipService);
+        try {
+          Thread.sleep(2000);
+        } catch (Exception ignore) {
+        }
+      }
+    }).start();
+  }
+
+  private void printLiveMembers(GossipManager gossipService) {
+    List<LocalMember> members = gossipService.getLiveMembers();
+    if (members.isEmpty()) {
+      System.out.println("Live: (none)");
+      return;
+    }
+    System.out.println("Live: " + members.get(0));
+    for (int i = 1; i < members.size(); i++) {
+      System.out.println("    : " + members.get(i));
+    }
+  }
+
+  private void printDeadMambers(GossipManager gossipService) {
+    List<LocalMember> members = gossipService.getDeadMembers();
+    if (members.isEmpty()) {
+      System.out.println("Dead: (none)");
+      return;
+    }
+    System.out.println("Dead: " + members.get(0));
+    for (int i = 1; i < members.size(); i++) {
+      System.out.println("    : " + members.get(i));
+    }
+  }
+
+  private void startBlockingReadLoop() throws IOException {
+    String line;
+    try (BufferedReader br = new BufferedReader(new InputStreamReader(System.in))) {
+      while ((line = br.readLine()) != null) {
+        System.out.println(line);
+        boolean valid = processReadLoopInput(line);
+        setLastInput(line, valid);
+      }
+    }
+  }
+
+  void initGossipManager(String[] args) {
+    GossipSettings s = new GossipSettings();
+    s.setWindowSize(1000);
+    s.setGossipInterval(100);
+    GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster("mycluster")
+            .uri(URI.create(args[0])).id(args[1])
+            .gossipMembers(Collections
+                    .singletonList(new RemoteMember("mycluster", URI.create(args[2]), args[3])))
+            .gossipSettings(s).build();
+    setGossipService(gossipService);
+  }
+
+  void setGossipService(GossipManager gossipService) {
+    this.gossipService = gossipService;
+  }
+
+  GossipManager getGossipManager() {
+    return this.gossipService;
+  }
+
+}
diff --git a/gossip-examples/src/main/java/org/apache/gossip/examples/StandAloneNode.java b/gossip-examples/src/main/java/org/apache/gossip/examples/StandAloneNode.java
index 70c3e4d..953e784 100644
--- a/gossip-examples/src/main/java/org/apache/gossip/examples/StandAloneNode.java
+++ b/gossip-examples/src/main/java/org/apache/gossip/examples/StandAloneNode.java
@@ -17,37 +17,26 @@
  */
 package org.apache.gossip.examples;
 
-import java.net.URI;
-import java.net.UnknownHostException;
-import java.util.Arrays;
-import org.apache.gossip.GossipSettings;
-import org.apache.gossip.RemoteMember;
+import java.io.IOException;
+
 import org.apache.gossip.manager.GossipManager;
-import org.apache.gossip.manager.GossipManagerBuilder;
 
-public class StandAloneNode {
+public class StandAloneNode extends StandAloneExampleBase {
 
-  private static ExampleCommon common = new ExampleCommon();
+  private static boolean WILL_READ = false;
 
-  public static void main(String[] args) throws UnknownHostException, InterruptedException {
-    args = common.checkArgsForClearFlag(args);
-    GossipSettings s = new GossipSettings();
-    s.setWindowSize(1000);
-    s.setGossipInterval(100);
-    GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster("mycluster")
-            .uri(URI.create(args[0]))
-            .id(args[1])
-            .gossipMembers(
-                    Arrays.asList(new RemoteMember("mycluster", URI.create(args[2]), args[3])))
-            .gossipSettings(s)
-            .build();
-    gossipService.init();
-    while (true) {
-      common.optionallyClearTerminal();
-      System.out.println("Live: " + gossipService.getLiveMembers());
-      System.out.println("Dead: " + gossipService.getDeadMembers());
-      Thread.sleep(2000);
-    }
+  public static void main(String[] args) throws InterruptedException, IOException {
+    StandAloneNode example = new StandAloneNode(args);
+    example.exec(WILL_READ);
+  }
+
+  StandAloneNode(String[] args) {
+    args = super.checkArgsForClearFlag(args);
+    super.initGossipManager(args);
+  }
+
+  @Override
+  void printValues(GossipManager gossipService) {
   }
 
 }
diff --git a/gossip-examples/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java b/gossip-examples/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java
index 78c7782..a184bc4 100644
--- a/gossip-examples/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java
+++ b/gossip-examples/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java
@@ -17,85 +17,89 @@
  */
 package org.apache.gossip.examples;
 
-import java.io.BufferedReader;
 import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.URI;
-import java.util.Arrays;
-import org.apache.gossip.GossipSettings;
-import org.apache.gossip.RemoteMember;
+
 import org.apache.gossip.crdt.GrowOnlyCounter;
 import org.apache.gossip.crdt.OrSet;
 import org.apache.gossip.manager.GossipManager;
-import org.apache.gossip.manager.GossipManagerBuilder;
 import org.apache.gossip.model.SharedDataMessage;
 
-public class StandAloneNodeCrdtOrSet {
+public class StandAloneNodeCrdtOrSet extends StandAloneExampleBase {
 
-  private static ExampleCommon common = new ExampleCommon();
+  private static final String INDEX_KEY_FOR_SET = "abc";
+
+  private static final String INDEX_KEY_FOR_COUNTER = "def";
 
   public static void main(String[] args) throws InterruptedException, IOException {
-    args = common.checkArgsForClearFlag(args);
-    GossipSettings s = new GossipSettings();
-    s.setWindowSize(1000);
-    s.setGossipInterval(100);
-    GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster("mycluster")
-            .uri(URI.create(args[0]))
-            .id(args[1])
-            .gossipMembers(
-                    Arrays.asList(new RemoteMember("mycluster", URI.create(args[2]), args[3])))
-            .gossipSettings(s)
-            .build();
-    gossipService.init();
-
-    new Thread(() -> {
-      while (true) {
-        common.optionallyClearTerminal();
-        System.out.println("Live: " + gossipService.getLiveMembers());
-        System.out.println("Dead: " + gossipService.getDeadMembers());
-        System.out.println("---------- " + (gossipService.findCrdt("abc") == null ? ""
-                : gossipService.findCrdt("abc").value()));
-        System.out.println("********** " + gossipService.findCrdt("abc"));
-        System.out.println("^^^^^^^^^^ " + (gossipService.findCrdt("def") == null ? ""
-                : gossipService.findCrdt("def").value()));
-        System.out.println("$$$$$$$$$$ " + gossipService.findCrdt("def"));
-        try {
-          Thread.sleep(2000);
-        } catch (Exception e) {
-        }
-      }
-    }).start();
-
-    String line = null;
-    try (BufferedReader br = new BufferedReader(new InputStreamReader(System.in))) {
-      while ((line = br.readLine()) != null) {
-        System.out.println(line);
-        char op = line.charAt(0);
-        String val = line.substring(2);
-        if (op == 'a') {
-          addData(val, gossipService);
-        } else if (op == 'r') {
-          removeData(val, gossipService);
-        } else if (op == 'g') {
-          gcount(val, gossipService);
-        }
-        if (op == 'l') {
-          listen(val, gossipService);
-        }
-      }
-    }
+    StandAloneNodeCrdtOrSet example = new StandAloneNodeCrdtOrSet(args);
+    boolean willRead = true;
+    example.exec(willRead);
   }
-  
+
+  StandAloneNodeCrdtOrSet(String[] args) {
+    args = super.checkArgsForClearFlag(args);
+    super.initGossipManager(args);
+  }
+
+  void printValues(GossipManager gossipService) {
+    System.out.println("Last Input: " + getLastInput());
+    System.out.println("---------- Or Set " + (gossipService.findCrdt(INDEX_KEY_FOR_SET) == null
+            ? "" : gossipService.findCrdt(INDEX_KEY_FOR_SET).value()));
+    System.out.println("********** " + gossipService.findCrdt(INDEX_KEY_FOR_SET));
+    System.out.println(
+            "^^^^^^^^^^ Grow Only Counter" + (gossipService.findCrdt(INDEX_KEY_FOR_COUNTER) == null
+                    ? "" : gossipService.findCrdt(INDEX_KEY_FOR_COUNTER).value()));
+    System.out.println("$$$$$$$$$$ " + gossipService.findCrdt(INDEX_KEY_FOR_COUNTER));
+  }
+
+  boolean processReadLoopInput(String line) {
+    boolean valid = true;
+    char op = line.charAt(0);
+    String val = line.substring(2);
+    if (op == 'a') {
+      addData(val, getGossipManager());
+    } else if (op == 'r') {
+      removeData(val, getGossipManager());
+    } else if (op == 'g') {
+      if (isNonNegativeNumber(val)) {
+        gcount(val, getGossipManager());
+      } else {
+        valid = false;
+      }
+    } else if (op == 'l') {
+      if ((val == INDEX_KEY_FOR_SET) || (val == INDEX_KEY_FOR_COUNTER)) {
+        listen(val, getGossipManager());
+      } else {
+        valid = false;
+      }
+    } else {
+      valid = false;
+    }
+    return valid;
+  }
+
+  private boolean isNonNegativeNumber(String val) {
+    long l = 0;
+    try {
+      Long n = Long.parseLong(val);
+      l = n.longValue();
+    } catch (Exception e) {
+      return false;
+    }
+    return (l >= 0);
+  }
+
   private static void listen(String val, GossipManager gossipManager) {
     gossipManager.registerSharedDataSubscriber((key, oldValue, newValue) -> {
       if (key.equals(val)) {
-        System.out.println("Event Handler fired! " + oldValue + " " + newValue);
+        System.out.println(
+                "Event Handler fired for key = '" + key + "'! " + oldValue + " " + newValue);
       }
     });
   }
-  
+
   private static void gcount(String val, GossipManager gossipManager) {
-    GrowOnlyCounter c = (GrowOnlyCounter) gossipManager.findCrdt("def");
+    GrowOnlyCounter c = (GrowOnlyCounter) gossipManager.findCrdt(INDEX_KEY_FOR_COUNTER);
     Long l = Long.valueOf(val);
     if (c == null) {
       c = new GrowOnlyCounter(new GrowOnlyCounter.Builder(gossipManager).increment((l)));
@@ -104,7 +108,7 @@
     }
     SharedDataMessage m = new SharedDataMessage();
     m.setExpireAt(Long.MAX_VALUE);
-    m.setKey("def");
+    m.setKey(INDEX_KEY_FOR_COUNTER);
     m.setPayload(c);
     m.setTimestamp(System.currentTimeMillis());
     gossipManager.merge(m);
@@ -112,10 +116,10 @@
 
   private static void removeData(String val, GossipManager gossipService) {
     @SuppressWarnings("unchecked")
-    OrSet<String> s = (OrSet<String>) gossipService.findCrdt("abc");
+    OrSet<String> s = (OrSet<String>) gossipService.findCrdt(INDEX_KEY_FOR_SET);
     SharedDataMessage m = new SharedDataMessage();
     m.setExpireAt(Long.MAX_VALUE);
-    m.setKey("abc");
+    m.setKey(INDEX_KEY_FOR_SET);
     m.setPayload(new OrSet<String>(s, new OrSet.Builder<String>().remove(val)));
     m.setTimestamp(System.currentTimeMillis());
     gossipService.merge(m);
@@ -124,7 +128,7 @@
   private static void addData(String val, GossipManager gossipService) {
     SharedDataMessage m = new SharedDataMessage();
     m.setExpireAt(Long.MAX_VALUE);
-    m.setKey("abc");
+    m.setKey(INDEX_KEY_FOR_SET);
     m.setPayload(new OrSet<String>(val));
     m.setTimestamp(System.currentTimeMillis());
     gossipService.merge(m);
diff --git a/gossip-examples/src/main/java/org/apache/gossip/examples/StandAlonePNCounter.java b/gossip-examples/src/main/java/org/apache/gossip/examples/StandAlonePNCounter.java
index b0015be..23e949b 100644
--- a/gossip-examples/src/main/java/org/apache/gossip/examples/StandAlonePNCounter.java
+++ b/gossip-examples/src/main/java/org/apache/gossip/examples/StandAlonePNCounter.java
@@ -17,116 +17,55 @@
  */
 package org.apache.gossip.examples;
 
-import java.io.BufferedReader;
 import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.URI;
-import java.util.Arrays;
-import java.util.List;
 
-import org.apache.gossip.GossipSettings;
-import org.apache.gossip.LocalMember;
-import org.apache.gossip.RemoteMember;
 import org.apache.gossip.crdt.PNCounter;
 import org.apache.gossip.manager.GossipManager;
-import org.apache.gossip.manager.GossipManagerBuilder;
 import org.apache.gossip.model.SharedDataMessage;
 
-public class StandAlonePNCounter {
-  private static ExampleCommon common = new ExampleCommon();
-  private static String lastInput = "{None}";
+public class StandAlonePNCounter extends StandAloneExampleBase {
 
   public static void main(String[] args) throws InterruptedException, IOException {
-    args = common.checkArgsForClearFlag(args);
-    GossipSettings s = new GossipSettings();
-    s.setWindowSize(1000);
-    s.setGossipInterval(100);
-    GossipManager gossipService = GossipManagerBuilder
-            .newBuilder()
-            .cluster("mycluster")
-            .uri(URI.create(args[0])).id(args[1])
-            .gossipMembers(
-                    Arrays.asList(new RemoteMember("mycluster", URI.create(args[2]), args[3])))
-            .gossipSettings(s)
-            .build();
-    gossipService.init();
-
-    new Thread(() -> {
-      while (true) {
-        common.optionallyClearTerminal();
-        printLiveMembers(gossipService);
-        printDeadMambers(gossipService);
-        printValues(gossipService);
-        try {
-          Thread.sleep(2000);
-        } catch (Exception ignore) {
-        }
-      }
-    }).start();
-
-    String line = null;
-    try (BufferedReader br = new BufferedReader(new InputStreamReader(System.in))) {
-      while ((line = br.readLine()) != null) {
-        System.out.println(line);
-        char op = line.charAt(0);
-        char blank = line.charAt(1);
-        String val = line.substring(2);
-        Long l = null;
-        boolean valid = true;
-        try {
-           l = Long.valueOf(val);
-        } catch (NumberFormatException ex) {
-          valid = false;
-        }
-        valid = valid &&
-          (
-            (blank == ' ') &&
-            ((op == 'i') || (op == 'd'))
-          );
-        if (valid) {
-          if (op == 'i') {
-            increment(l, gossipService);
-          } else if (op == 'd') {
-            decrement(l, gossipService);
-          }
-        }
-        setLastInput(line,valid);
-      }
-    }
+    StandAlonePNCounter example = new StandAlonePNCounter(args);
+    boolean willRead = true;
+    example.exec(willRead);
   }
 
-  private static void printValues(GossipManager gossipService) {
+  StandAlonePNCounter(String[] args) {
+    args = super.checkArgsForClearFlag(args);
+    super.initGossipManager(args);
+  }
+
+  void printValues(GossipManager gossipService) {
     System.out.println("Last Input: " + getLastInput());
     System.out.println("---------- " + (gossipService.findCrdt("myPNCounter") == null ? ""
             : gossipService.findCrdt("myPNCounter").value()));
     System.out.println("********** " + gossipService.findCrdt("myPNCounter"));
   }
 
-  private static void printDeadMambers(GossipManager gossipService) {
-    List<LocalMember> members = gossipService.getDeadMembers();
-    if (members.isEmpty()) {
-       System.out.println("Dead: (none)");
-       return;
+  boolean processReadLoopInput(String line) {
+    char op = line.charAt(0);
+    char blank = line.charAt(1);
+    String val = line.substring(2);
+    Long l = null;
+    boolean valid = true;
+    try {
+      l = Long.valueOf(val);
+    } catch (NumberFormatException ex) {
+      valid = false;
     }
-    System.out.println("Dead: " + members.get(0));
-    for (int i = 1; i < members.size(); i++) {
-      System.out.println("    : " + members.get(i)); 
+    valid = valid && ((blank == ' ') && ((op == 'i') || (op == 'd')));
+    if (valid) {
+      if (op == 'i') {
+        increment(l, getGossipManager());
+      } else if (op == 'd') {
+        decrement(l, getGossipManager());
+      }
     }
+    return valid;
   }
 
-  private static void printLiveMembers(GossipManager gossipService) {
-    List<LocalMember> members = gossipService.getLiveMembers();
-    if (members.isEmpty()) {
-       System.out.println("Live: (none)");
-       return;
-    }
-    System.out.println("Live: " + members.get(0));
-    for (int i = 1; i < members.size(); i++) {
-      System.out.println("    : " + members.get(i)); 
-    }
-  }
-
-  private static void increment(Long l, GossipManager gossipManager) {
+  void increment(Long l, GossipManager gossipManager) {
     PNCounter c = (PNCounter) gossipManager.findCrdt("myPNCounter");
     if (c == null) {
       c = new PNCounter(new PNCounter.Builder(gossipManager).increment((l)));
@@ -141,7 +80,7 @@
     gossipManager.merge(m);
   }
 
-  private static void decrement(Long l, GossipManager gossipManager) {
+  void decrement(Long l, GossipManager gossipManager) {
     PNCounter c = (PNCounter) gossipManager.findCrdt("myPNCounter");
     if (c == null) {
       c = new PNCounter(new PNCounter.Builder(gossipManager).decrement((l)));
@@ -155,16 +94,5 @@
     m.setTimestamp(System.currentTimeMillis());
     gossipManager.merge(m);
   }
-  
-  private static void setLastInput(String input, boolean valid) {
-    lastInput = input;
-    if (! valid) {
-      lastInput += " (invalid)";
-    }
-  }
-
-  private static String getLastInput() {
-    return lastInput;
-  }
 
 }
\ No newline at end of file
diff --git a/gossip-itest/src/test/java/org/apache/gossip/PerNodeDataReplicationControlTest.java b/gossip-itest/src/test/java/org/apache/gossip/PerNodeDataReplicationControlTest.java
new file mode 100644
index 0000000..e715410
--- /dev/null
+++ b/gossip-itest/src/test/java/org/apache/gossip/PerNodeDataReplicationControlTest.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip;
+
+import io.teknek.tunit.TUnit;
+import org.apache.gossip.manager.DatacenterRackAwareActiveGossiper;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.manager.GossipManagerBuilder;
+import org.apache.gossip.model.PerNodeDataMessage;
+import org.apache.gossip.model.SharedDataMessage;
+import org.apache.gossip.replication.*;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.platform.runner.JUnitPlatform;
+import org.junit.runner.RunWith;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+@RunWith(JUnitPlatform.class)
+public class PerNodeDataReplicationControlTest extends AbstractIntegrationBase {
+
+  @Test
+  public void perNodeDataReplicationTest()
+          throws InterruptedException, UnknownHostException, URISyntaxException {
+
+    generateStandardNodes(3);
+
+    // check whether the members are discovered
+    TUnit.assertThat(() -> {
+      int total = 0;
+      for (GossipManager node : nodes) {
+        total += node.getLiveMembers().size();
+      }
+      return total;
+    }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2);
+
+    // Adding new per node data to Node 1 with default replication (replicate all)
+    nodes.get(0).gossipPerNodeData(getPerNodeData("public", "I am visible to all",
+            new AllReplicable<>()));
+    // Adding new per node data to Node 1 with no replication (replicate none)
+    nodes.get(0).gossipPerNodeData(getPerNodeData("private", "I am private",
+            new NotReplicable<>()));
+
+    List<LocalMember> whiteList = new ArrayList<>();
+    whiteList.add(nodes.get(1).getMyself());
+    // Adding new per node data to Node 1 with white list Node 2
+    nodes.get(0).gossipPerNodeData(getPerNodeData("wl", "white list",
+            new WhiteListReplicable<>(whiteList)));
+
+    List<LocalMember> blackList = new ArrayList<>();
+    blackList.add(nodes.get(1).getMyself());
+    // Adding new per node data to Node 1 with black list Node 2
+    nodes.get(0).gossipPerNodeData(getPerNodeData("bl", "black list",
+            new BlackListReplicable<>(blackList)));
+
+    // Node 2 and 3 must have the shared data with key 'public'
+    TUnit.assertThat(() -> {
+      PerNodeDataMessage message = nodes.get(1).findPerNodeGossipData("1", "public");
+      if(message == null){
+        return "";
+      }else {
+        return message.getPayload();
+      }
+    }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("I am visible to all");
+
+    TUnit.assertThat(() -> {
+      PerNodeDataMessage message = nodes.get(2).findPerNodeGossipData("1", "public");
+      if(message == null){
+        return "";
+      }else {
+        return message.getPayload();
+      }
+    }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("I am visible to all");
+
+    // Node 2 must have shared data with key wl
+    TUnit.assertThat(() -> {
+      PerNodeDataMessage message = nodes.get(1).findPerNodeGossipData("1", "wl");
+      if(message == null){
+        return "";
+      }else {
+        return message.getPayload();
+      }
+    }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("white list");
+
+    // Node 3 must have shared data with key bl
+    TUnit.assertThat(() -> {
+      PerNodeDataMessage message = nodes.get(2).findPerNodeGossipData("1", "bl");
+      if(message == null){
+        return "";
+      }else {
+        return message.getPayload();
+      }
+    }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("black list");
+
+  }
+
+  @Test
+  public void perNodeDataDcReplicationTest()
+          throws InterruptedException, UnknownHostException, URISyntaxException {
+
+    GossipSettings settings = new GossipSettings();
+    settings.setPersistRingState(false);
+    settings.setPersistDataState(false);
+    String cluster = UUID.randomUUID().toString();
+    settings.setActiveGossipClass(DatacenterRackAwareActiveGossiper.class.getName());
+
+    Map<String, String> gossipProps = new HashMap<>();
+    gossipProps.put("sameRackGossipIntervalMs", "500");
+    gossipProps.put("differentDatacenterGossipIntervalMs", "1000");
+    settings.setActiveGossipProperties(gossipProps);
+
+    RemoteMember seeder = new RemoteMember(cluster, URI.create("udp://127.0.0.1:5001"), "1");
+
+    // initialize 2 data centers with each having two racks
+    createDcNode(URI.create("udp://127.0.0.1:5001"), "1", settings, seeder, cluster,
+            "DataCenter1", "Rack1");
+    createDcNode(URI.create("udp://127.0.0.1:5002"), "2", settings, seeder, cluster,
+            "DataCenter1", "Rack2");
+
+    createDcNode(URI.create("udp://127.0.0.1:5006"), "6", settings, seeder, cluster,
+            "DataCenter2", "Rack1");
+    createDcNode(URI.create("udp://127.0.0.1:5007"), "7", settings, seeder, cluster,
+            "DataCenter2", "Rack1");
+
+    // check whether the members are discovered
+    TUnit.assertThat(() -> {
+      int total = 0;
+      for (int i = 0; i < 4; ++i) {
+        total += nodes.get(i).getLiveMembers().size();
+      }
+      return total;
+    }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(12);
+
+    // Node 1 has a shared key with 'Dc1Rack1'
+    nodes.get(0).gossipPerNodeData(getPerNodeData("Dc1Rack1", "I am belong to Dc1",
+            new DataCenterReplicable<>()));
+    // Node 6 has a shared key with 'Dc2Rack1'
+    nodes.get(2).gossipPerNodeData(getPerNodeData("Dc2Rack1", "I am belong to Dc2",
+            new DataCenterReplicable<>()));
+
+    // Node 2 must have the shared data with key 'Dc1Rack1'
+    TUnit.assertThat(() -> {
+      PerNodeDataMessage message = nodes.get(1).findPerNodeGossipData("1", "Dc1Rack1");
+      if(message == null){
+        return "";
+      }else {
+        return message.getPayload();
+      }
+    }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("I am belong to Dc1");
+
+
+    // Node 7 must have the shared data with key 'Dc2Rack1'
+    TUnit.assertThat(() -> {
+      PerNodeDataMessage message = nodes.get(3).findPerNodeGossipData("6", "Dc2Rack1");
+      if(message == null){
+        return "";
+      }else {
+        return message.getPayload();
+      }
+    }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("I am belong to Dc2");
+
+  }
+
+  private PerNodeDataMessage getPerNodeData(String key, String value,
+          Replicable<PerNodeDataMessage> replicable) {
+    PerNodeDataMessage g = new PerNodeDataMessage();
+    g.setExpireAt(Long.MAX_VALUE);
+    g.setKey(key);
+    g.setPayload(value);
+    g.setTimestamp(System.currentTimeMillis());
+    g.setReplicable(replicable);
+    return g;
+  }
+
+  private void createDcNode(URI uri, String id, GossipSettings settings, RemoteMember seeder,
+          String cluster, String dataCenter, String rack){
+    Map<String, String> props = new HashMap<>();
+    props.put(DatacenterRackAwareActiveGossiper.DATACENTER, dataCenter);
+    props.put(DatacenterRackAwareActiveGossiper.RACK, rack);
+
+    GossipManager dcNode = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri).id(id)
+            .gossipSettings(settings).gossipMembers(Arrays.asList(seeder)).properties(props)
+            .build();
+    dcNode.init();
+    register(dcNode);
+  }
+
+}
diff --git a/gossip-itest/src/test/java/org/apache/gossip/SharedDataReplicationControlTest.java b/gossip-itest/src/test/java/org/apache/gossip/SharedDataReplicationControlTest.java
new file mode 100644
index 0000000..8ce063d
--- /dev/null
+++ b/gossip-itest/src/test/java/org/apache/gossip/SharedDataReplicationControlTest.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip;
+
+import io.teknek.tunit.TUnit;
+import org.apache.gossip.manager.DatacenterRackAwareActiveGossiper;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.manager.GossipManagerBuilder;
+import org.apache.gossip.model.SharedDataMessage;
+import org.apache.gossip.replication.AllReplicable;
+import org.apache.gossip.replication.BlackListReplicable;
+import org.apache.gossip.replication.DataCenterReplicable;
+import org.apache.gossip.replication.NotReplicable;
+import org.apache.gossip.replication.Replicable;
+import org.apache.gossip.replication.WhiteListReplicable;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.platform.runner.JUnitPlatform;
+import org.junit.runner.RunWith;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+@RunWith(JUnitPlatform.class)
+public class SharedDataReplicationControlTest extends AbstractIntegrationBase {
+
+  @Test
+  public void sharedDataReplicationTest()
+          throws InterruptedException, UnknownHostException, URISyntaxException {
+    generateStandardNodes(3);
+
+    // check whether the members are discovered
+    TUnit.assertThat(() -> {
+      int total = 0;
+      for (GossipManager node : nodes) {
+        total += node.getLiveMembers().size();
+      }
+      return total;
+    }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2);
+
+    // Adding new shared data to Node 1 with default replication (replicate all)
+    nodes.get(0).gossipSharedData(sharedNodeData("public", "I am visible to all",
+            new AllReplicable<>()));
+    // Adding new shared data to Node 1 with no replication (replicate none)
+    nodes.get(0).gossipSharedData(sharedNodeData("private", "I am private",
+            new NotReplicable<>()));
+
+    List<LocalMember> whiteList = new ArrayList<>();
+    whiteList.add(nodes.get(1).getMyself());
+    // Adding new shared data to Node 1 with white list Node 2
+    nodes.get(0).gossipSharedData(sharedNodeData("wl", "white list",
+            new WhiteListReplicable<>(whiteList)));
+
+    List<LocalMember> blackList = new ArrayList<>();
+    blackList.add(nodes.get(1).getMyself());
+    // Adding new shared data to Node 1 with black list Node 2
+    nodes.get(0).gossipSharedData(sharedNodeData("bl", "black list",
+            new BlackListReplicable<>(blackList)));
+
+    TUnit.assertThat(() -> {
+      SharedDataMessage message = nodes.get(1).findSharedGossipData("public");
+      if(message == null){
+        return "";
+      }else {
+        return message.getPayload();
+      }
+    }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("I am visible to all");
+
+    TUnit.assertThat(() -> {
+      SharedDataMessage message = nodes.get(2).findSharedGossipData("public");
+      if(message == null){
+        return "";
+      }else {
+        return message.getPayload();
+      }
+    }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("I am visible to all");
+
+    // Node 2 must have shared data with key wl
+    TUnit.assertThat(() -> {
+      SharedDataMessage message = nodes.get(1).findSharedGossipData("wl");
+      if(message == null){
+        return "";
+      }else {
+        return message.getPayload();
+      }
+    }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("white list");
+
+    // Node 3 must have shared data with key bl
+    TUnit.assertThat(() -> {
+      SharedDataMessage message = nodes.get(2).findSharedGossipData("bl");
+      if(message == null){
+        return "";
+      }else {
+        return message.getPayload();
+      }
+    }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("black list");
+  }
+
+  @Test
+  public void sharedDataDcReplicationTest()
+          throws InterruptedException, UnknownHostException, URISyntaxException {
+
+    GossipSettings settings = new GossipSettings();
+    settings.setPersistRingState(false);
+    settings.setPersistDataState(false);
+    String cluster = UUID.randomUUID().toString();
+    settings.setActiveGossipClass(DatacenterRackAwareActiveGossiper.class.getName());
+
+    Map<String, String> gossipProps = new HashMap<>();
+    gossipProps.put("sameRackGossipIntervalMs", "500");
+    gossipProps.put("differentDatacenterGossipIntervalMs", "1000");
+    settings.setActiveGossipProperties(gossipProps);
+
+    RemoteMember seeder = new RemoteMember(cluster, URI.create("udp://127.0.0.1:5001"), "1");
+
+    // initialize 2 data centers with each having two racks
+    createDcNode(URI.create("udp://127.0.0.1:5001"), "1", settings, seeder, cluster,
+            "DataCenter1", "Rack1");
+    createDcNode(URI.create("udp://127.0.0.1:5002"), "2", settings, seeder, cluster,
+            "DataCenter1", "Rack2");
+
+    createDcNode(URI.create("udp://127.0.0.1:5006"), "6", settings, seeder, cluster,
+            "DataCenter2", "Rack1");
+    createDcNode(URI.create("udp://127.0.0.1:5007"), "7", settings, seeder, cluster,
+            "DataCenter2", "Rack1");
+
+    // check whether the members are discovered
+    TUnit.assertThat(() -> {
+      int total = 0;
+      for (int i = 0; i < 4; ++i) {
+        total += nodes.get(i).getLiveMembers().size();
+      }
+      return total;
+    }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(12);
+
+    // Node 1 has a shared key with 'Dc1Rack1'
+    nodes.get(0).gossipSharedData(sharedNodeData("Dc1Rack1", "I am belong to Dc1",
+            new DataCenterReplicable<>()));
+    // Node 6 has a shared key with 'Dc2Rack1'
+    nodes.get(2).gossipSharedData(sharedNodeData("Dc2Rack1", "I am belong to Dc2",
+            new DataCenterReplicable<>()));
+
+    // Node 2 must have the shared data with key 'Dc1Rack1'
+    TUnit.assertThat(() -> {
+      SharedDataMessage message = nodes.get(1).findSharedGossipData("Dc1Rack1");
+      if(message == null){
+        return "";
+      }else {
+        return message.getPayload();
+      }
+    }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("I am belong to Dc1");
+
+    // Node 7 must have the shared data with key 'Dc2Rack1'
+    TUnit.assertThat(() -> {
+      SharedDataMessage message = nodes.get(3).findSharedGossipData("Dc2Rack1");
+      if(message == null){
+        return "";
+      }else {
+        return message.getPayload();
+      }
+    }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("I am belong to Dc2");
+
+  }
+
+  private SharedDataMessage sharedNodeData(String key, String value,
+          Replicable<SharedDataMessage> replicable) {
+    SharedDataMessage g = new SharedDataMessage();
+    g.setExpireAt(Long.MAX_VALUE);
+    g.setKey(key);
+    g.setPayload(value);
+    g.setTimestamp(System.currentTimeMillis());
+    g.setReplicable(replicable);
+    return g;
+  }
+
+  private void createDcNode(URI uri, String id, GossipSettings settings, RemoteMember seeder,
+          String cluster, String dataCenter, String rack){
+    Map<String, String> props = new HashMap<>();
+    props.put(DatacenterRackAwareActiveGossiper.DATACENTER, dataCenter);
+    props.put(DatacenterRackAwareActiveGossiper.RACK, rack);
+
+    GossipManager dcNode = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri).id(id)
+            .gossipSettings(settings).gossipMembers(Arrays.asList(seeder)).properties(props)
+            .build();
+    dcNode.init();
+    register(dcNode);
+  }
+
+}