Merge branch 'GOSSIP-89' of https://github.com/Terry-Weymouth/incubator-gossip
diff --git a/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java b/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java
index ab5cefa..396ec03 100644
--- a/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java
+++ b/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java
@@ -20,9 +20,15 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.core.Version;
import com.fasterxml.jackson.databind.module.SimpleModule;
+import org.apache.gossip.LocalMember;
+import org.apache.gossip.replication.BlackListReplicable;
+import org.apache.gossip.replication.Replicable;
+import org.apache.gossip.replication.WhiteListReplicable;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
@@ -81,6 +87,26 @@
@JsonProperty("n-counters") abstract Map<String, Long> getNCounters();
}
+@JsonTypeInfo(
+ use = JsonTypeInfo.Id.CLASS,
+ include = JsonTypeInfo.As.PROPERTY,
+ property = "type")
+abstract class ReplicableMixin {
+
+}
+
+abstract class WhiteListReplicableMixin {
+ @JsonCreator
+ WhiteListReplicableMixin(@JsonProperty("whiteListMembers") List<LocalMember> whiteListMembers) { }
+ @JsonProperty("whiteListMembers") abstract List<LocalMember> getWhiteListMembers();
+}
+
+abstract class BlackListReplicableMixin {
+ @JsonCreator
+ BlackListReplicableMixin(@JsonProperty("blackListMembers") List<LocalMember> blackListMembers) { }
+ @JsonProperty("blackListMembers") abstract List<LocalMember> getBlackListMembers();
+}
+
//If anyone wants to take a stab at this. please have at it
//https://github.com/FasterXML/jackson-datatype-guava/blob/master/src/main/java/com/fasterxml/jackson/datatype/guava/ser/MultimapSerializer.java
public class CrdtModule extends SimpleModule {
@@ -101,6 +127,9 @@
context.setMixInAnnotations(LwwSet.Timestamps.class, LWWSetTimestampsMixin.class);
context.setMixInAnnotations(MaxChangeSet.class, MaxChangeSetMixin.class);
context.setMixInAnnotations(TwoPhaseSet.class, TwoPhaseSetMixin.class);
+ context.setMixInAnnotations(Replicable.class, ReplicableMixin.class);
+ context.setMixInAnnotations(WhiteListReplicable.class, WhiteListReplicableMixin.class);
+ context.setMixInAnnotations(BlackListReplicable.class, BlackListReplicableMixin.class);
}
}
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java b/gossip-base/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java
index b73550e..adf2530 100644
--- a/gossip-base/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java
@@ -85,6 +85,10 @@
}
long startTime = System.currentTimeMillis();
for (Entry<String, SharedDataMessage> innerEntry : gossipCore.getSharedData().entrySet()){
+ if (innerEntry.getValue().getReplicable() != null && !innerEntry.getValue().getReplicable()
+ .shouldReplicate(me, member, innerEntry.getValue())) {
+ continue;
+ }
UdpSharedDataMessage message = new UdpSharedDataMessage();
message.setUuid(UUID.randomUUID().toString());
message.setUriFrom(me.getId());
@@ -93,6 +97,7 @@
message.setNodeId(innerEntry.getValue().getNodeId());
message.setTimestamp(innerEntry.getValue().getTimestamp());
message.setPayload(innerEntry.getValue().getPayload());
+ message.setReplicable(innerEntry.getValue().getReplicable());
gossipCore.sendOneWay(message, member.getUri());
}
sharedDataHistogram.update(System.currentTimeMillis() - startTime);
@@ -105,6 +110,10 @@
long startTime = System.currentTimeMillis();
for (Entry<String, ConcurrentHashMap<String, PerNodeDataMessage>> entry : gossipCore.getPerNodeData().entrySet()){
for (Entry<String, PerNodeDataMessage> innerEntry : entry.getValue().entrySet()){
+ if (innerEntry.getValue().getReplicable() != null && !innerEntry.getValue().getReplicable()
+ .shouldReplicate(me, member, innerEntry.getValue())) {
+ continue;
+ }
UdpPerNodeDataMessage message = new UdpPerNodeDataMessage();
message.setUuid(UUID.randomUUID().toString());
message.setUriFrom(me.getId());
@@ -113,6 +122,7 @@
message.setNodeId(innerEntry.getValue().getNodeId());
message.setTimestamp(innerEntry.getValue().getTimestamp());
message.setPayload(innerEntry.getValue().getPayload());
+ message.setReplicable(innerEntry.getValue().getReplicable());
gossipCore.sendOneWay(message, member.getUri());
}
}
diff --git a/gossip-base/src/main/java/org/apache/gossip/model/PerNodeDataMessage.java b/gossip-base/src/main/java/org/apache/gossip/model/PerNodeDataMessage.java
index 2d1cdef..2394e76 100644
--- a/gossip-base/src/main/java/org/apache/gossip/model/PerNodeDataMessage.java
+++ b/gossip-base/src/main/java/org/apache/gossip/model/PerNodeDataMessage.java
@@ -17,6 +17,8 @@
*/
package org.apache.gossip.model;
+import org.apache.gossip.replication.Replicable;
+
public class PerNodeDataMessage extends Base {
private String nodeId;
@@ -24,7 +26,8 @@
private Object payload;
private Long timestamp;
private Long expireAt;
-
+ private Replicable<PerNodeDataMessage> replicable;
+
public String getNodeId() {
return nodeId;
}
@@ -55,10 +58,20 @@
public void setExpireAt(Long expireAt) {
this.expireAt = expireAt;
}
+
+ public Replicable<PerNodeDataMessage> getReplicable() {
+ return replicable;
+ }
+
+ public void setReplicable(Replicable<PerNodeDataMessage> replicable) {
+ this.replicable = replicable;
+ }
+
@Override
public String toString() {
return "GossipDataMessage [nodeId=" + nodeId + ", key=" + key + ", payload=" + payload
- + ", timestamp=" + timestamp + ", expireAt=" + expireAt + "]";
+ + ", timestamp=" + timestamp + ", expireAt=" + expireAt
+ + ", replicable=" + replicable + "]";
}
diff --git a/gossip-base/src/main/java/org/apache/gossip/model/SharedDataMessage.java b/gossip-base/src/main/java/org/apache/gossip/model/SharedDataMessage.java
index e423be8..4b1a1ea 100644
--- a/gossip-base/src/main/java/org/apache/gossip/model/SharedDataMessage.java
+++ b/gossip-base/src/main/java/org/apache/gossip/model/SharedDataMessage.java
@@ -17,6 +17,9 @@
*/
package org.apache.gossip.model;
+import org.apache.gossip.replication.AllReplicable;
+import org.apache.gossip.replication.Replicable;
+
public class SharedDataMessage extends Base {
private String nodeId;
@@ -24,6 +27,7 @@
private Object payload;
private Long timestamp;
private Long expireAt;
+ private Replicable<SharedDataMessage> replicable;
public String getNodeId() {
return nodeId;
@@ -55,10 +59,20 @@
public void setExpireAt(Long expireAt) {
this.expireAt = expireAt;
}
+
+ public Replicable<SharedDataMessage> getReplicable() {
+ return replicable;
+ }
+
+ public void setReplicable(Replicable<SharedDataMessage> replicable) {
+ this.replicable = replicable;
+ }
+
@Override
public String toString() {
return "SharedGossipDataMessage [nodeId=" + nodeId + ", key=" + key + ", payload=" + payload
- + ", timestamp=" + timestamp + ", expireAt=" + expireAt + "]";
- }
+ + ", timestamp=" + timestamp + ", expireAt=" + expireAt
+ + ", replicable=" + replicable + "]";
+ }
}
diff --git a/gossip-base/src/main/java/org/apache/gossip/replication/AllReplicable.java b/gossip-base/src/main/java/org/apache/gossip/replication/AllReplicable.java
new file mode 100644
index 0000000..573fd25
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/replication/AllReplicable.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.replication;
+
+import org.apache.gossip.LocalMember;
+import org.apache.gossip.model.Base;
+
+/**
+ * Replicable implementation which replicates data to any node. This is the default replication
+ * strategy if a data item not specified its replication behaviour.
+ *
+ * @param <T> A subtype of the class {@link org.apache.gossip.model.Base} which uses this interface
+ * @see Replicable
+ */
+public class AllReplicable<T extends Base> implements Replicable<T> {
+
+ @Override
+ public boolean shouldReplicate(LocalMember me, LocalMember destination, T message) {
+ return true;
+ }
+}
diff --git a/gossip-base/src/main/java/org/apache/gossip/replication/BlackListReplicable.java b/gossip-base/src/main/java/org/apache/gossip/replication/BlackListReplicable.java
new file mode 100644
index 0000000..33e1706
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/replication/BlackListReplicable.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.replication;
+
+import org.apache.gossip.LocalMember;
+import org.apache.gossip.model.Base;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Replicable implementation which does not replicate data to given set of nodes.
+ *
+ * @param <T> A subtype of the class {@link org.apache.gossip.model.Base} which uses this interface
+ * @see Replicable
+ */
+public class BlackListReplicable<T extends Base> implements Replicable<T> {
+
+ private final List<LocalMember> blackListMembers;
+
+ public BlackListReplicable(List<LocalMember> blackListMembers) {
+ if (blackListMembers == null) {
+ this.blackListMembers = new ArrayList<>();
+ } else {
+ this.blackListMembers = blackListMembers;
+ }
+ }
+
+ public List<LocalMember> getBlackListMembers() {
+ return blackListMembers;
+ }
+
+ @Override
+ public boolean shouldReplicate(LocalMember me, LocalMember destination, T message) {
+ return !blackListMembers.contains(destination);
+ }
+}
diff --git a/gossip-base/src/main/java/org/apache/gossip/replication/DataCenterReplicable.java b/gossip-base/src/main/java/org/apache/gossip/replication/DataCenterReplicable.java
new file mode 100644
index 0000000..1067c49
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/replication/DataCenterReplicable.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.replication;
+
+import org.apache.gossip.LocalMember;
+import org.apache.gossip.manager.DatacenterRackAwareActiveGossiper;
+import org.apache.gossip.model.Base;
+
+/**
+ * Replicable implementation which does replicate data only in the same data center.
+ *
+ * @param <T> A subtype of the class {@link org.apache.gossip.model.Base} which uses this interface
+ * @see Replicable
+ */
+public class DataCenterReplicable<T extends Base> implements Replicable<T> {
+
+ @Override
+ public boolean shouldReplicate(LocalMember me, LocalMember destination, T message) {
+ if (!me.getProperties().containsKey(DatacenterRackAwareActiveGossiper.DATACENTER)) {
+ // replicate to others if I am not belong to any data center
+ return true;
+ } else if (!destination.getProperties()
+ .containsKey(DatacenterRackAwareActiveGossiper.DATACENTER)) {
+ // Do not replicate if the destination data center is not defined
+ return false;
+ } else {
+ return me.getProperties().get(DatacenterRackAwareActiveGossiper.DATACENTER)
+ .equals(destination.getProperties().get(DatacenterRackAwareActiveGossiper.DATACENTER));
+ }
+ }
+}
diff --git a/gossip-base/src/main/java/org/apache/gossip/replication/NotReplicable.java b/gossip-base/src/main/java/org/apache/gossip/replication/NotReplicable.java
new file mode 100644
index 0000000..c3fa538
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/replication/NotReplicable.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.replication;
+
+import org.apache.gossip.LocalMember;
+import org.apache.gossip.model.Base;
+
+/**
+ * Replicable implementation which never replicates data on any node
+ *
+ * @param <T> A subtype of the class {@link org.apache.gossip.model.Base} which uses this interface
+ * @see Replicable
+ */
+public class NotReplicable<T extends Base> implements Replicable<T> {
+
+ @Override
+ public boolean shouldReplicate(LocalMember me, LocalMember destination, T message) {
+ return false;
+ }
+}
diff --git a/gossip-base/src/main/java/org/apache/gossip/replication/Replicable.java b/gossip-base/src/main/java/org/apache/gossip/replication/Replicable.java
new file mode 100644
index 0000000..68098df
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/replication/Replicable.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.replication;
+
+import org.apache.gossip.LocalMember;
+import org.apache.gossip.model.Base;
+
+/**
+ * This interface is used to determine whether a data item needs to be replicated to
+ * another gossip member.
+ *
+ * @param <T> A subtype of the class {@link org.apache.gossip.model.Base} which uses this interface
+ */
+public interface Replicable<T extends Base> {
+ /**
+ * Test for a given data item needs to be replicated.
+ * @param me node that the data item is going to transmit from.
+ * @param destination target node to replicate.
+ * @param message this parameter is currently ignored
+ * @return true if the data item needs to be replicated to the destination. Otherwise false.
+ */
+ boolean shouldReplicate(LocalMember me, LocalMember destination, T message);
+}
diff --git a/gossip-base/src/main/java/org/apache/gossip/replication/WhiteListReplicable.java b/gossip-base/src/main/java/org/apache/gossip/replication/WhiteListReplicable.java
new file mode 100644
index 0000000..299d929
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/replication/WhiteListReplicable.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.replication;
+
+import org.apache.gossip.LocalMember;
+import org.apache.gossip.model.Base;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Replicable implementation which replicates data to given set of nodes.
+ *
+ * @param <T> A subtype of the class {@link org.apache.gossip.model.Base} which uses this interface
+ * @see Replicable
+ */
+public class WhiteListReplicable<T extends Base> implements Replicable<T> {
+
+ private final List<LocalMember> whiteListMembers;
+
+ public WhiteListReplicable(List<LocalMember> whiteListMembers) {
+ if (whiteListMembers == null) {
+ this.whiteListMembers = new ArrayList<>();
+ } else {
+ this.whiteListMembers = whiteListMembers;
+ }
+ }
+
+ public List<LocalMember> getWhiteListMembers() {
+ return whiteListMembers;
+ }
+
+ @Override
+ public boolean shouldReplicate(LocalMember me, LocalMember destination, T message) {
+ return whiteListMembers.contains(destination);
+ }
+}
diff --git a/gossip-base/src/main/java/org/apache/gossip/udp/UdpPerNodeDataMessage.java b/gossip-base/src/main/java/org/apache/gossip/udp/UdpPerNodeDataMessage.java
index 6eb170a..9ba1e85 100644
--- a/gossip-base/src/main/java/org/apache/gossip/udp/UdpPerNodeDataMessage.java
+++ b/gossip-base/src/main/java/org/apache/gossip/udp/UdpPerNodeDataMessage.java
@@ -42,7 +42,8 @@
@Override
public String toString() {
- return "UdpGossipDataMessage [uriFrom=" + uriFrom + ", uuid=" + uuid + "]";
+ return "UdpGossipDataMessage [uriFrom=" + uriFrom + ", uuid=" + uuid
+ + ", getReplicable()=" + getReplicable() + "]";
}
}
diff --git a/gossip-base/src/main/java/org/apache/gossip/udp/UdpSharedDataMessage.java b/gossip-base/src/main/java/org/apache/gossip/udp/UdpSharedDataMessage.java
index 1658503..0059bdb 100644
--- a/gossip-base/src/main/java/org/apache/gossip/udp/UdpSharedDataMessage.java
+++ b/gossip-base/src/main/java/org/apache/gossip/udp/UdpSharedDataMessage.java
@@ -44,7 +44,8 @@
public String toString() {
return "UdpSharedGossipDataMessage [uriFrom=" + uriFrom + ", uuid=" + uuid + ", getNodeId()="
+ getNodeId() + ", getKey()=" + getKey() + ", getPayload()=" + getPayload()
- + ", getTimestamp()=" + getTimestamp() + ", getExpireAt()=" + getExpireAt() + "]";
+ + ", getTimestamp()=" + getTimestamp() + ", getExpireAt()=" + getExpireAt()
+ + ", getReplicable()=" + getReplicable() + "]";
}
}
diff --git a/gossip-base/src/test/java/org/apache/gossip/AbstractIntegrationBase.java b/gossip-base/src/test/java/org/apache/gossip/AbstractIntegrationBase.java
index 896157f..d074706 100644
--- a/gossip-base/src/test/java/org/apache/gossip/AbstractIntegrationBase.java
+++ b/gossip-base/src/test/java/org/apache/gossip/AbstractIntegrationBase.java
@@ -18,24 +18,52 @@
package org.apache.gossip;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
+import java.util.UUID;
import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.manager.GossipManagerBuilder;
import org.junit.After;
import org.junit.Before;
public abstract class AbstractIntegrationBase {
- List <GossipManager> nodes = new ArrayList<GossipManager>();
+ List <GossipManager> nodes = new ArrayList<>();
public void register(GossipManager manager){
nodes.add(manager);
}
-
+
+ public void generateStandardNodes(final int memberCount) throws URISyntaxException {
+ if(nodes.size() > 0){
+ after();
+ nodes.clear();
+ }
+ GossipSettings settings = new GossipSettings();
+ settings.setPersistRingState(false);
+ settings.setPersistDataState(false);
+ String cluster = UUID.randomUUID().toString();
+ int seedNodes = 1;
+ List<Member> startupMembers = new ArrayList<>();
+ for (int i = 1; i < seedNodes + 1; ++i) {
+ URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+ startupMembers.add(new RemoteMember(cluster, uri, i + ""));
+ }
+
+ for (int i = 1; i < memberCount + 1; ++i) {
+ URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+ GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri)
+ .id(i + "").gossipMembers(startupMembers).gossipSettings(settings).build();
+ gossipService.init();
+ register(gossipService);
+ }
+ }
@Before
public void before(){
- nodes = new ArrayList<GossipManager>();
+ nodes = new ArrayList<>();
}
@After
diff --git a/gossip-base/src/test/java/org/apache/gossip/replication/DataReplicationTest.java b/gossip-base/src/test/java/org/apache/gossip/replication/DataReplicationTest.java
new file mode 100644
index 0000000..dd073a8
--- /dev/null
+++ b/gossip-base/src/test/java/org/apache/gossip/replication/DataReplicationTest.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.replication;
+
+import org.apache.gossip.LocalMember;
+import org.apache.gossip.manager.DatacenterRackAwareActiveGossiper;
+import org.apache.gossip.model.SharedDataMessage;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.platform.runner.JUnitPlatform;
+import org.junit.runner.RunWith;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@RunWith(JUnitPlatform.class)
+public class DataReplicationTest {
+
+ @Test
+ public void dataReplicateAllTest() throws URISyntaxException {
+ SharedDataMessage message = getSharedNodeData("public","public", new AllReplicable<>());
+ LocalMember me = getLocalMember(new URI("udp://127.0.0.1:8001"),"1");
+ LocalMember member = getLocalMember(new URI("udp://127.0.0.1:8002"),"2");
+ Assert.assertEquals(true, message.getReplicable().shouldReplicate(me, member, message));
+ }
+
+ @Test
+ public void dataReplicateNoneTest() throws URISyntaxException {
+ SharedDataMessage message = getSharedNodeData("private","private", new NotReplicable<>());
+ LocalMember me = getLocalMember(new URI("udp://127.0.0.1:8001"),"1");
+ LocalMember member = getLocalMember(new URI("udp://127.0.0.1:8002"),"2");
+ Assert.assertEquals(false, message.getReplicable().shouldReplicate(me, member, message));
+ }
+
+ @Test
+ public void dataReplicateWhiteListTest() throws URISyntaxException {
+ List<LocalMember> memberList = new ArrayList<>();
+ memberList.add(getLocalMember(new URI("udp://127.0.0.1:8001"),"1"));
+ memberList.add(getLocalMember(new URI("udp://127.0.0.1:8002"),"2"));
+ memberList.add(getLocalMember(new URI("udp://127.0.0.1:8003"),"3"));
+ // add node 1 and 2 to the white list
+ List<LocalMember> whiteList = new ArrayList<>();
+ whiteList.add(memberList.get(0));
+ whiteList.add(memberList.get(1));
+
+ SharedDataMessage message = getSharedNodeData("whiteList", "Only allow some nodes",
+ new WhiteListReplicable<>(whiteList));
+ LocalMember me = getLocalMember(new URI("udp://127.0.0.1:8004"),"4");
+
+ // data should replicate to node 1 and 2 but not 3
+ Assert.assertEquals(true,
+ message.getReplicable().shouldReplicate(me, memberList.get(0), message));
+ Assert.assertEquals(true,
+ message.getReplicable().shouldReplicate(me, memberList.get(1), message));
+ Assert.assertEquals(false,
+ message.getReplicable().shouldReplicate(me, memberList.get(2), message));
+ }
+
+ @Test
+ public void dataReplicateWhiteListNullTest() throws URISyntaxException {
+ List<LocalMember> memberList = new ArrayList<>();
+ memberList.add(getLocalMember(new URI("udp://127.0.0.1:8001"),"1"));
+ memberList.add(getLocalMember(new URI("udp://127.0.0.1:8002"),"2"));
+
+ SharedDataMessage message = getSharedNodeData("whiteList", "Only allow some nodes",
+ new WhiteListReplicable<>(null));
+
+ // data should not replicate if no whitelist specified
+ Assert.assertEquals(false,
+ message.getReplicable().shouldReplicate(memberList.get(0), memberList.get(1), message));
+ Assert.assertEquals(false,
+ message.getReplicable().shouldReplicate(memberList.get(1), memberList.get(0), message));
+
+ }
+
+ @Test
+ public void dataReplicateBlackListTest() throws URISyntaxException {
+ List<LocalMember> memberList = new ArrayList<>();
+ memberList.add(getLocalMember(new URI("udp://127.0.0.1:8001"),"1"));
+ memberList.add(getLocalMember(new URI("udp://127.0.0.1:8002"),"2"));
+ memberList.add(getLocalMember(new URI("udp://127.0.0.1:8003"),"3"));
+ // add node 1 and 2 to the black list
+ List<LocalMember> blackList = new ArrayList<>();
+ blackList.add(memberList.get(0));
+ blackList.add(memberList.get(1));
+
+ SharedDataMessage message = getSharedNodeData("blackList", "Disallow some nodes",
+ new BlackListReplicable<>(blackList));
+ LocalMember me = getLocalMember(new URI("udp://127.0.0.1:8004"),"4");
+
+ // data should not replicate to node 1 and 2
+ Assert.assertEquals(false,
+ message.getReplicable().shouldReplicate(me, memberList.get(0), message));
+ Assert.assertEquals(false,
+ message.getReplicable().shouldReplicate(me, memberList.get(1), message));
+ Assert.assertEquals(true,
+ message.getReplicable().shouldReplicate(me, memberList.get(2), message));
+ }
+
+ @Test
+ public void dataReplicateBlackListNullTest() throws URISyntaxException {
+
+ List<LocalMember> memberList = new ArrayList<>();
+ memberList.add(getLocalMember(new URI("udp://127.0.0.1:8001"),"1"));
+ memberList.add(getLocalMember(new URI("udp://127.0.0.1:8002"),"2"));
+
+ SharedDataMessage message = getSharedNodeData("blackList", "Disallow some nodes",
+ new BlackListReplicable<>(null));
+
+ // data should replicate if no blacklist specified
+ Assert.assertEquals(true,
+ message.getReplicable().shouldReplicate(memberList.get(0), memberList.get(1), message));
+ Assert.assertEquals(true,
+ message.getReplicable().shouldReplicate(memberList.get(1), memberList.get(0), message));
+ }
+
+ @Test
+ public void dataReplicateDataCenterTest() throws URISyntaxException {
+
+ List<LocalMember> memberListDc1 = new ArrayList<>();
+ List<LocalMember> memberListDc2 = new ArrayList<>();
+
+ memberListDc1
+ .add(getLocalMemberDc(new URI("udp://10.0.0.1:8000"), "1", "DataCenter1", "Rack1"));
+ memberListDc1
+ .add(getLocalMemberDc(new URI("udp://10.0.0.2:8000"), "2", "DataCenter1", "Rack2"));
+ memberListDc2
+ .add(getLocalMemberDc(new URI("udp://10.0.1.1:8000"), "11", "DataCenter2", "Rack1"));
+ memberListDc2
+ .add(getLocalMemberDc(new URI("udp://10.0.1.2:8000"), "12", "DataCenter2", "Rack2"));
+
+ SharedDataMessage message = getSharedNodeData("datacenter1", "I am in data center 1 rack 1",
+ new DataCenterReplicable<>());
+
+ // data should replicate in data center 1
+ Assert.assertEquals(true, message.getReplicable()
+ .shouldReplicate(memberListDc1.get(0), memberListDc1.get(1), message));
+ Assert.assertEquals(true, message.getReplicable()
+ .shouldReplicate(memberListDc2.get(0), memberListDc2.get(1), message));
+
+ // data should not replicate to data center 2
+ Assert.assertEquals(false, message.getReplicable()
+ .shouldReplicate(memberListDc1.get(0), memberListDc2.get(0), message));
+ Assert.assertEquals(false, message.getReplicable()
+ .shouldReplicate(memberListDc1.get(1), memberListDc2.get(1), message));
+ }
+
+ @Test
+ public void dataReplicateDataCenterUnknownDataCenterTest() throws URISyntaxException {
+
+ List<LocalMember> memberListDc1 = new ArrayList<>();
+ memberListDc1
+ .add(getLocalMemberDc(new URI("udp://10.0.0.1:8000"), "1", "DataCenter1", "Rack1"));
+
+ Map<String, String> properties = new HashMap<>();
+ LocalMember unknownDc = new LocalMember("cluster1", new URI("udp://10.0.1.2:8000"), "12", 0,
+ properties, 1, 0, "");
+
+ SharedDataMessage message = getSharedNodeData("datacenter1","I am in data center 1 rack 1", new DataCenterReplicable<>());
+
+ // data should not replicate from dc1 to unknown node
+ Assert.assertEquals(false, message.getReplicable()
+ .shouldReplicate(memberListDc1.get(0), unknownDc, message));
+ // data can replicate from unknown node to dc
+ Assert.assertEquals(true, message.getReplicable()
+ .shouldReplicate(unknownDc, memberListDc1.get(0), message));
+
+ }
+
+ private static SharedDataMessage getSharedNodeData(String key, String value,
+ Replicable<SharedDataMessage> replicable) {
+ SharedDataMessage g = new SharedDataMessage();
+ g.setExpireAt(Long.MAX_VALUE);
+ g.setKey(key);
+ g.setPayload(value);
+ g.setTimestamp(System.currentTimeMillis());
+ g.setReplicable(replicable);
+ return g;
+ }
+
+ private static LocalMember getLocalMember(URI uri, String id){
+ return new LocalMember("cluster1", uri, id, 0, null, 1, 0, "");
+ }
+
+ private static LocalMember getLocalMemberDc(URI uri, String id, String dataCenter, String rack){
+ Map<String, String> props = new HashMap<>();
+ props.put(DatacenterRackAwareActiveGossiper.DATACENTER, dataCenter);
+ props.put(DatacenterRackAwareActiveGossiper.RACK, rack);
+ return new LocalMember("cluster1", uri, id, 0, props, 1, 0, "");
+ }
+}
diff --git a/gossip-itest/src/test/java/org/apache/gossip/PerNodeDataReplicationControlTest.java b/gossip-itest/src/test/java/org/apache/gossip/PerNodeDataReplicationControlTest.java
new file mode 100644
index 0000000..e715410
--- /dev/null
+++ b/gossip-itest/src/test/java/org/apache/gossip/PerNodeDataReplicationControlTest.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip;
+
+import io.teknek.tunit.TUnit;
+import org.apache.gossip.manager.DatacenterRackAwareActiveGossiper;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.manager.GossipManagerBuilder;
+import org.apache.gossip.model.PerNodeDataMessage;
+import org.apache.gossip.model.SharedDataMessage;
+import org.apache.gossip.replication.*;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.platform.runner.JUnitPlatform;
+import org.junit.runner.RunWith;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+@RunWith(JUnitPlatform.class)
+public class PerNodeDataReplicationControlTest extends AbstractIntegrationBase {
+
+ @Test
+ public void perNodeDataReplicationTest()
+ throws InterruptedException, UnknownHostException, URISyntaxException {
+
+ generateStandardNodes(3);
+
+ // check whether the members are discovered
+ TUnit.assertThat(() -> {
+ int total = 0;
+ for (GossipManager node : nodes) {
+ total += node.getLiveMembers().size();
+ }
+ return total;
+ }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2);
+
+ // Adding new per node data to Node 1 with default replication (replicate all)
+ nodes.get(0).gossipPerNodeData(getPerNodeData("public", "I am visible to all",
+ new AllReplicable<>()));
+ // Adding new per node data to Node 1 with no replication (replicate none)
+ nodes.get(0).gossipPerNodeData(getPerNodeData("private", "I am private",
+ new NotReplicable<>()));
+
+ List<LocalMember> whiteList = new ArrayList<>();
+ whiteList.add(nodes.get(1).getMyself());
+ // Adding new per node data to Node 1 with white list Node 2
+ nodes.get(0).gossipPerNodeData(getPerNodeData("wl", "white list",
+ new WhiteListReplicable<>(whiteList)));
+
+ List<LocalMember> blackList = new ArrayList<>();
+ blackList.add(nodes.get(1).getMyself());
+ // Adding new per node data to Node 1 with black list Node 2
+ nodes.get(0).gossipPerNodeData(getPerNodeData("bl", "black list",
+ new BlackListReplicable<>(blackList)));
+
+ // Node 2 and 3 must have the shared data with key 'public'
+ TUnit.assertThat(() -> {
+ PerNodeDataMessage message = nodes.get(1).findPerNodeGossipData("1", "public");
+ if(message == null){
+ return "";
+ }else {
+ return message.getPayload();
+ }
+ }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("I am visible to all");
+
+ TUnit.assertThat(() -> {
+ PerNodeDataMessage message = nodes.get(2).findPerNodeGossipData("1", "public");
+ if(message == null){
+ return "";
+ }else {
+ return message.getPayload();
+ }
+ }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("I am visible to all");
+
+ // Node 2 must have shared data with key wl
+ TUnit.assertThat(() -> {
+ PerNodeDataMessage message = nodes.get(1).findPerNodeGossipData("1", "wl");
+ if(message == null){
+ return "";
+ }else {
+ return message.getPayload();
+ }
+ }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("white list");
+
+ // Node 3 must have shared data with key bl
+ TUnit.assertThat(() -> {
+ PerNodeDataMessage message = nodes.get(2).findPerNodeGossipData("1", "bl");
+ if(message == null){
+ return "";
+ }else {
+ return message.getPayload();
+ }
+ }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("black list");
+
+ }
+
+ @Test
+ public void perNodeDataDcReplicationTest()
+ throws InterruptedException, UnknownHostException, URISyntaxException {
+
+ GossipSettings settings = new GossipSettings();
+ settings.setPersistRingState(false);
+ settings.setPersistDataState(false);
+ String cluster = UUID.randomUUID().toString();
+ settings.setActiveGossipClass(DatacenterRackAwareActiveGossiper.class.getName());
+
+ Map<String, String> gossipProps = new HashMap<>();
+ gossipProps.put("sameRackGossipIntervalMs", "500");
+ gossipProps.put("differentDatacenterGossipIntervalMs", "1000");
+ settings.setActiveGossipProperties(gossipProps);
+
+ RemoteMember seeder = new RemoteMember(cluster, URI.create("udp://127.0.0.1:5001"), "1");
+
+ // initialize 2 data centers with each having two racks
+ createDcNode(URI.create("udp://127.0.0.1:5001"), "1", settings, seeder, cluster,
+ "DataCenter1", "Rack1");
+ createDcNode(URI.create("udp://127.0.0.1:5002"), "2", settings, seeder, cluster,
+ "DataCenter1", "Rack2");
+
+ createDcNode(URI.create("udp://127.0.0.1:5006"), "6", settings, seeder, cluster,
+ "DataCenter2", "Rack1");
+ createDcNode(URI.create("udp://127.0.0.1:5007"), "7", settings, seeder, cluster,
+ "DataCenter2", "Rack1");
+
+ // check whether the members are discovered
+ TUnit.assertThat(() -> {
+ int total = 0;
+ for (int i = 0; i < 4; ++i) {
+ total += nodes.get(i).getLiveMembers().size();
+ }
+ return total;
+ }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(12);
+
+ // Node 1 has a shared key with 'Dc1Rack1'
+ nodes.get(0).gossipPerNodeData(getPerNodeData("Dc1Rack1", "I am belong to Dc1",
+ new DataCenterReplicable<>()));
+ // Node 6 has a shared key with 'Dc2Rack1'
+ nodes.get(2).gossipPerNodeData(getPerNodeData("Dc2Rack1", "I am belong to Dc2",
+ new DataCenterReplicable<>()));
+
+ // Node 2 must have the shared data with key 'Dc1Rack1'
+ TUnit.assertThat(() -> {
+ PerNodeDataMessage message = nodes.get(1).findPerNodeGossipData("1", "Dc1Rack1");
+ if(message == null){
+ return "";
+ }else {
+ return message.getPayload();
+ }
+ }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("I am belong to Dc1");
+
+
+ // Node 7 must have the shared data with key 'Dc2Rack1'
+ TUnit.assertThat(() -> {
+ PerNodeDataMessage message = nodes.get(3).findPerNodeGossipData("6", "Dc2Rack1");
+ if(message == null){
+ return "";
+ }else {
+ return message.getPayload();
+ }
+ }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("I am belong to Dc2");
+
+ }
+
+ private PerNodeDataMessage getPerNodeData(String key, String value,
+ Replicable<PerNodeDataMessage> replicable) {
+ PerNodeDataMessage g = new PerNodeDataMessage();
+ g.setExpireAt(Long.MAX_VALUE);
+ g.setKey(key);
+ g.setPayload(value);
+ g.setTimestamp(System.currentTimeMillis());
+ g.setReplicable(replicable);
+ return g;
+ }
+
+ private void createDcNode(URI uri, String id, GossipSettings settings, RemoteMember seeder,
+ String cluster, String dataCenter, String rack){
+ Map<String, String> props = new HashMap<>();
+ props.put(DatacenterRackAwareActiveGossiper.DATACENTER, dataCenter);
+ props.put(DatacenterRackAwareActiveGossiper.RACK, rack);
+
+ GossipManager dcNode = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri).id(id)
+ .gossipSettings(settings).gossipMembers(Arrays.asList(seeder)).properties(props)
+ .build();
+ dcNode.init();
+ register(dcNode);
+ }
+
+}
diff --git a/gossip-itest/src/test/java/org/apache/gossip/SharedDataReplicationControlTest.java b/gossip-itest/src/test/java/org/apache/gossip/SharedDataReplicationControlTest.java
new file mode 100644
index 0000000..8ce063d
--- /dev/null
+++ b/gossip-itest/src/test/java/org/apache/gossip/SharedDataReplicationControlTest.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip;
+
+import io.teknek.tunit.TUnit;
+import org.apache.gossip.manager.DatacenterRackAwareActiveGossiper;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.manager.GossipManagerBuilder;
+import org.apache.gossip.model.SharedDataMessage;
+import org.apache.gossip.replication.AllReplicable;
+import org.apache.gossip.replication.BlackListReplicable;
+import org.apache.gossip.replication.DataCenterReplicable;
+import org.apache.gossip.replication.NotReplicable;
+import org.apache.gossip.replication.Replicable;
+import org.apache.gossip.replication.WhiteListReplicable;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.platform.runner.JUnitPlatform;
+import org.junit.runner.RunWith;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+@RunWith(JUnitPlatform.class)
+public class SharedDataReplicationControlTest extends AbstractIntegrationBase {
+
+ @Test
+ public void sharedDataReplicationTest()
+ throws InterruptedException, UnknownHostException, URISyntaxException {
+ generateStandardNodes(3);
+
+ // check whether the members are discovered
+ TUnit.assertThat(() -> {
+ int total = 0;
+ for (GossipManager node : nodes) {
+ total += node.getLiveMembers().size();
+ }
+ return total;
+ }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2);
+
+ // Adding new shared data to Node 1 with default replication (replicate all)
+ nodes.get(0).gossipSharedData(sharedNodeData("public", "I am visible to all",
+ new AllReplicable<>()));
+ // Adding new shared data to Node 1 with no replication (replicate none)
+ nodes.get(0).gossipSharedData(sharedNodeData("private", "I am private",
+ new NotReplicable<>()));
+
+ List<LocalMember> whiteList = new ArrayList<>();
+ whiteList.add(nodes.get(1).getMyself());
+ // Adding new shared data to Node 1 with white list Node 2
+ nodes.get(0).gossipSharedData(sharedNodeData("wl", "white list",
+ new WhiteListReplicable<>(whiteList)));
+
+ List<LocalMember> blackList = new ArrayList<>();
+ blackList.add(nodes.get(1).getMyself());
+ // Adding new shared data to Node 1 with black list Node 2
+ nodes.get(0).gossipSharedData(sharedNodeData("bl", "black list",
+ new BlackListReplicable<>(blackList)));
+
+ TUnit.assertThat(() -> {
+ SharedDataMessage message = nodes.get(1).findSharedGossipData("public");
+ if(message == null){
+ return "";
+ }else {
+ return message.getPayload();
+ }
+ }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("I am visible to all");
+
+ TUnit.assertThat(() -> {
+ SharedDataMessage message = nodes.get(2).findSharedGossipData("public");
+ if(message == null){
+ return "";
+ }else {
+ return message.getPayload();
+ }
+ }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("I am visible to all");
+
+ // Node 2 must have shared data with key wl
+ TUnit.assertThat(() -> {
+ SharedDataMessage message = nodes.get(1).findSharedGossipData("wl");
+ if(message == null){
+ return "";
+ }else {
+ return message.getPayload();
+ }
+ }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("white list");
+
+ // Node 3 must have shared data with key bl
+ TUnit.assertThat(() -> {
+ SharedDataMessage message = nodes.get(2).findSharedGossipData("bl");
+ if(message == null){
+ return "";
+ }else {
+ return message.getPayload();
+ }
+ }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("black list");
+ }
+
+ @Test
+ public void sharedDataDcReplicationTest()
+ throws InterruptedException, UnknownHostException, URISyntaxException {
+
+ GossipSettings settings = new GossipSettings();
+ settings.setPersistRingState(false);
+ settings.setPersistDataState(false);
+ String cluster = UUID.randomUUID().toString();
+ settings.setActiveGossipClass(DatacenterRackAwareActiveGossiper.class.getName());
+
+ Map<String, String> gossipProps = new HashMap<>();
+ gossipProps.put("sameRackGossipIntervalMs", "500");
+ gossipProps.put("differentDatacenterGossipIntervalMs", "1000");
+ settings.setActiveGossipProperties(gossipProps);
+
+ RemoteMember seeder = new RemoteMember(cluster, URI.create("udp://127.0.0.1:5001"), "1");
+
+ // initialize 2 data centers with each having two racks
+ createDcNode(URI.create("udp://127.0.0.1:5001"), "1", settings, seeder, cluster,
+ "DataCenter1", "Rack1");
+ createDcNode(URI.create("udp://127.0.0.1:5002"), "2", settings, seeder, cluster,
+ "DataCenter1", "Rack2");
+
+ createDcNode(URI.create("udp://127.0.0.1:5006"), "6", settings, seeder, cluster,
+ "DataCenter2", "Rack1");
+ createDcNode(URI.create("udp://127.0.0.1:5007"), "7", settings, seeder, cluster,
+ "DataCenter2", "Rack1");
+
+ // check whether the members are discovered
+ TUnit.assertThat(() -> {
+ int total = 0;
+ for (int i = 0; i < 4; ++i) {
+ total += nodes.get(i).getLiveMembers().size();
+ }
+ return total;
+ }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(12);
+
+ // Node 1 has a shared key with 'Dc1Rack1'
+ nodes.get(0).gossipSharedData(sharedNodeData("Dc1Rack1", "I am belong to Dc1",
+ new DataCenterReplicable<>()));
+ // Node 6 has a shared key with 'Dc2Rack1'
+ nodes.get(2).gossipSharedData(sharedNodeData("Dc2Rack1", "I am belong to Dc2",
+ new DataCenterReplicable<>()));
+
+ // Node 2 must have the shared data with key 'Dc1Rack1'
+ TUnit.assertThat(() -> {
+ SharedDataMessage message = nodes.get(1).findSharedGossipData("Dc1Rack1");
+ if(message == null){
+ return "";
+ }else {
+ return message.getPayload();
+ }
+ }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("I am belong to Dc1");
+
+ // Node 7 must have the shared data with key 'Dc2Rack1'
+ TUnit.assertThat(() -> {
+ SharedDataMessage message = nodes.get(3).findSharedGossipData("Dc2Rack1");
+ if(message == null){
+ return "";
+ }else {
+ return message.getPayload();
+ }
+ }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("I am belong to Dc2");
+
+ }
+
+ private SharedDataMessage sharedNodeData(String key, String value,
+ Replicable<SharedDataMessage> replicable) {
+ SharedDataMessage g = new SharedDataMessage();
+ g.setExpireAt(Long.MAX_VALUE);
+ g.setKey(key);
+ g.setPayload(value);
+ g.setTimestamp(System.currentTimeMillis());
+ g.setReplicable(replicable);
+ return g;
+ }
+
+ private void createDcNode(URI uri, String id, GossipSettings settings, RemoteMember seeder,
+ String cluster, String dataCenter, String rack){
+ Map<String, String> props = new HashMap<>();
+ props.put(DatacenterRackAwareActiveGossiper.DATACENTER, dataCenter);
+ props.put(DatacenterRackAwareActiveGossiper.RACK, rack);
+
+ GossipManager dcNode = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri).id(id)
+ .gossipSettings(settings).gossipMembers(Arrays.asList(seeder)).properties(props)
+ .build();
+ dcNode.init();
+ register(dcNode);
+ }
+
+}