Merge branch 'GOSSIP-55' of https://github.com/Mirage20/incubator-gossip
diff --git a/gossip-base/src/main/java/org/apache/gossip/event/data/DataEventConstants.java b/gossip-base/src/main/java/org/apache/gossip/event/data/DataEventConstants.java
new file mode 100644
index 0000000..217087f
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/event/data/DataEventConstants.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.event.data;
+
+public class DataEventConstants {
+
+ // MetricRegistry
+ public static final String PER_NODE_DATA_SUBSCRIBERS_SIZE
+ = "gossip.event.data.pernode.subscribers.size";
+ public static final String PER_NODE_DATA_SUBSCRIBERS_QUEUE_SIZE
+ = "gossip.event.data.pernode.subscribers.queue.size";
+ public static final String SHARED_DATA_SUBSCRIBERS_SIZE
+ = "gossip.event.data.shared.subscribers.size";
+ public static final String SHARED_DATA_SUBSCRIBERS_QUEUE_SIZE
+ = "gossip.event.data.shared.subscribers.queue.size";
+
+ // Thread pool
+ public static final int PER_NODE_DATA_QUEUE_SIZE = 64;
+ public static final int PER_NODE_DATA_CORE_POOL_SIZE = 1;
+ public static final int PER_NODE_DATA_MAX_POOL_SIZE = 30;
+ public static final int PER_NODE_DATA_KEEP_ALIVE_TIME_SECONDS = 1;
+ public static final int SHARED_DATA_QUEUE_SIZE = 64;
+ public static final int SHARED_DATA_CORE_POOL_SIZE = 1;
+ public static final int SHARED_DATA_MAX_POOL_SIZE = 30;
+ public static final int SHARED_DATA_KEEP_ALIVE_TIME_SECONDS = 1;
+
+}
diff --git a/gossip-base/src/main/java/org/apache/gossip/event/data/DataEventManager.java b/gossip-base/src/main/java/org/apache/gossip/event/data/DataEventManager.java
new file mode 100644
index 0000000..3124df1
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/event/data/DataEventManager.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.event.data;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.MetricRegistry;
+
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+public class DataEventManager {
+
+ private final List<UpdateNodeDataEventHandler> perNodeDataHandlers;
+ private final BlockingQueue<Runnable> perNodeDataHandlerQueue;
+ private final ExecutorService perNodeDataEventExecutor;
+ private final List<UpdateSharedDataEventHandler> sharedDataHandlers;
+ private final BlockingQueue<Runnable> sharedDataHandlerQueue;
+ private final ExecutorService sharedDataEventExecutor;
+
+ public DataEventManager(MetricRegistry metrics) {
+ perNodeDataHandlers = new CopyOnWriteArrayList<>();
+ perNodeDataHandlerQueue = new ArrayBlockingQueue<>(DataEventConstants.PER_NODE_DATA_QUEUE_SIZE);
+ perNodeDataEventExecutor = new ThreadPoolExecutor(
+ DataEventConstants.PER_NODE_DATA_CORE_POOL_SIZE,
+ DataEventConstants.PER_NODE_DATA_MAX_POOL_SIZE,
+ DataEventConstants.PER_NODE_DATA_KEEP_ALIVE_TIME_SECONDS, TimeUnit.SECONDS,
+ perNodeDataHandlerQueue, new ThreadPoolExecutor.DiscardOldestPolicy());
+
+ sharedDataHandlers = new CopyOnWriteArrayList<>();
+ sharedDataHandlerQueue = new ArrayBlockingQueue<>(DataEventConstants.SHARED_DATA_QUEUE_SIZE);
+ sharedDataEventExecutor = new ThreadPoolExecutor(DataEventConstants.SHARED_DATA_CORE_POOL_SIZE,
+ DataEventConstants.SHARED_DATA_MAX_POOL_SIZE,
+ DataEventConstants.SHARED_DATA_KEEP_ALIVE_TIME_SECONDS, TimeUnit.SECONDS,
+ sharedDataHandlerQueue, new ThreadPoolExecutor.DiscardOldestPolicy());
+
+ metrics.register(DataEventConstants.PER_NODE_DATA_SUBSCRIBERS_SIZE,
+ (Gauge<Integer>) () -> perNodeDataHandlers.size());
+ metrics.register(DataEventConstants.PER_NODE_DATA_SUBSCRIBERS_QUEUE_SIZE,
+ (Gauge<Integer>) () -> perNodeDataHandlerQueue.size());
+ metrics.register(DataEventConstants.SHARED_DATA_SUBSCRIBERS_SIZE,
+ (Gauge<Integer>) () -> sharedDataHandlers.size());
+ metrics.register(DataEventConstants.SHARED_DATA_SUBSCRIBERS_QUEUE_SIZE,
+ (Gauge<Integer>) () -> sharedDataHandlerQueue.size());
+
+ }
+
+ public void notifySharedData(final String key, final Object newValue, final Object oldValue) {
+ sharedDataHandlers.forEach(handler -> sharedDataEventExecutor
+ .execute(() -> handler.onUpdate(key, oldValue, newValue)));
+ }
+
+ public void notifyPerNodeData(final String nodeId, final String key, final Object newValue,
+ final Object oldValue) {
+ perNodeDataHandlers.forEach(handler -> perNodeDataEventExecutor
+ .execute(() -> handler.onUpdate(nodeId, key, oldValue, newValue)));
+ }
+
+ public void registerPerNodeDataSubscriber(UpdateNodeDataEventHandler handler) {
+ perNodeDataHandlers.add(handler);
+ }
+
+ public void unregisterPerNodeDataSubscriber(UpdateNodeDataEventHandler handler) {
+ perNodeDataHandlers.remove(handler);
+ }
+
+ public int getPerNodeSubscribersSize() {
+ return perNodeDataHandlers.size();
+ }
+
+ public void registerSharedDataSubscriber(UpdateSharedDataEventHandler handler) {
+ sharedDataHandlers.add(handler);
+ }
+
+ public void unregisterSharedDataSubscriber(UpdateSharedDataEventHandler handler) {
+ sharedDataHandlers.remove(handler);
+ }
+
+ public int getSharedDataSubscribersSize() {
+ return sharedDataHandlers.size();
+ }
+
+}
diff --git a/gossip-base/src/main/java/org/apache/gossip/event/data/UpdateNodeDataEventHandler.java b/gossip-base/src/main/java/org/apache/gossip/event/data/UpdateNodeDataEventHandler.java
new file mode 100644
index 0000000..ca88c17
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/event/data/UpdateNodeDataEventHandler.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.event.data;
+
+/**
+ * Event handler interface for the per node data items.
+ * Classes which implement this interface get notifications when per node data item get changed.
+ */
+public interface UpdateNodeDataEventHandler {
+
+ /**
+ * This method get called when a per node datum get changed.
+ *
+ * @param nodeId id of the node that change the value
+ * @param key key of the datum
+ * @param oldValue previous value of the datum or null if the datum is discovered
+ * for the first time
+ * @param newValue updated value of the datum
+ */
+ void onUpdate(String nodeId, String key, Object oldValue, Object newValue);
+
+}
diff --git a/gossip-base/src/main/java/org/apache/gossip/event/data/UpdateSharedDataEventHandler.java b/gossip-base/src/main/java/org/apache/gossip/event/data/UpdateSharedDataEventHandler.java
new file mode 100644
index 0000000..5655732
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/event/data/UpdateSharedDataEventHandler.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.event.data;
+
+/**
+ * Event handler interface for shared data items.
+ * Classes which implement this interface get notifications when shared data get changed.
+ */
+public interface UpdateSharedDataEventHandler {
+ /**
+ * This method get called when shared data get changed.
+ *
+ * @param key key of the shared data item
+ * @param oldValue previous value or null if the data is discovered for the first time
+ * @param newValue updated value of the data item
+ */
+ void onUpdate(String key, Object oldValue, Object newValue);
+
+}
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java b/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java
index e034432..4167664 100644
--- a/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java
@@ -20,12 +20,18 @@
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
-import org.apache.gossip.Member;
import org.apache.gossip.LocalMember;
+import org.apache.gossip.Member;
import org.apache.gossip.RemoteMember;
import org.apache.gossip.crdt.Crdt;
import org.apache.gossip.event.GossipState;
-import org.apache.gossip.model.*;
+import org.apache.gossip.event.data.DataEventManager;
+import org.apache.gossip.event.data.UpdateNodeDataEventHandler;
+import org.apache.gossip.event.data.UpdateSharedDataEventHandler;
+import org.apache.gossip.model.Base;
+import org.apache.gossip.model.PerNodeDataMessage;
+import org.apache.gossip.model.Response;
+import org.apache.gossip.model.SharedDataMessage;
import org.apache.gossip.udp.Trackable;
import org.apache.log4j.Logger;
@@ -55,13 +61,15 @@
private final Meter messageSerdeException;
private final Meter tranmissionException;
private final Meter tranmissionSuccess;
-
+ private final DataEventManager eventManager;
+
public GossipCore(GossipManager manager, MetricRegistry metrics){
this.gossipManager = manager;
requests = new ConcurrentHashMap<>();
workQueue = new ArrayBlockingQueue<>(1024);
perNodeData = new ConcurrentHashMap<>();
sharedData = new ConcurrentHashMap<>();
+ eventManager = new DataEventManager(metrics);
metrics.register(WORKQUEUE_SIZE, (Gauge<Integer>)() -> workQueue.size());
metrics.register(PER_NODE_DATA_SIZE, (Gauge<Integer>)() -> perNodeData.size());
metrics.register(SHARED_DATA_SIZE, (Gauge<Integer>)() -> sharedData.size());
@@ -76,6 +84,7 @@
while (true){
SharedDataMessage previous = sharedData.putIfAbsent(message.getKey(), message);
if (previous == null){
+ eventManager.notifySharedData(message.getKey(), message.getPayload(), null);
return;
}
if (message.getPayload() instanceof Crdt){
@@ -88,12 +97,17 @@
merged.setPayload(mergedCrdt);
boolean replaced = sharedData.replace(message.getKey(), previous, merged);
if (replaced){
+ if(!merged.getPayload().equals(previous.getPayload())) {
+ eventManager
+ .notifySharedData(message.getKey(), merged.getPayload(), previous.getPayload());
+ }
return;
}
} else {
if (previous.getTimestamp() < message.getTimestamp()){
boolean result = sharedData.replace(message.getKey(), previous, message);
if (result){
+ eventManager.notifySharedData(message.getKey(), message.getPayload(), previous.getPayload());
return;
}
} else {
@@ -102,7 +116,7 @@
}
}
}
-
+
public void addPerNodeData(PerNodeDataMessage message){
ConcurrentHashMap<String,PerNodeDataMessage> nodeMap = new ConcurrentHashMap<>();
nodeMap.put(message.getKey(), message);
@@ -111,11 +125,16 @@
PerNodeDataMessage current = nodeMap.get(message.getKey());
if (current == null){
nodeMap.putIfAbsent(message.getKey(), message);
+ eventManager.notifyPerNodeData(message.getNodeId(), message.getKey(), message.getPayload(), null);
} else {
if (current.getTimestamp() < message.getTimestamp()){
nodeMap.replace(message.getKey(), current, message);
+ eventManager.notifyPerNodeData(message.getNodeId(), message.getKey(), message.getPayload(),
+ current.getPayload());
}
}
+ } else {
+ eventManager.notifyPerNodeData(message.getNodeId(), message.getKey(), message.getPayload(), null);
}
}
@@ -178,7 +197,7 @@
sendInternal(message, uri);
if (latchAndBase == null){
return null;
- }
+ }
try {
boolean complete = latchAndBase.latch.await(1, TimeUnit.SECONDS);
@@ -297,4 +316,20 @@
}
}
}
+
+ void registerPerNodeDataSubscriber(UpdateNodeDataEventHandler handler){
+ eventManager.registerPerNodeDataSubscriber(handler);
+ }
+
+ void registerSharedDataSubscriber(UpdateSharedDataEventHandler handler){
+ eventManager.registerSharedDataSubscriber(handler);
+ }
+
+ void unregisterPerNodeDataSubscriber(UpdateNodeDataEventHandler handler){
+ eventManager.unregisterPerNodeDataSubscriber(handler);
+ }
+
+ void unregisterSharedDataSubscriber(UpdateSharedDataEventHandler handler){
+ eventManager.unregisterSharedDataSubscriber(handler);
+ }
}
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java
index 133a79f..d839b2e 100644
--- a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java
@@ -26,6 +26,8 @@
import org.apache.gossip.crdt.Crdt;
import org.apache.gossip.event.GossipListener;
import org.apache.gossip.event.GossipState;
+import org.apache.gossip.event.data.UpdateNodeDataEventHandler;
+import org.apache.gossip.event.data.UpdateSharedDataEventHandler;
import org.apache.gossip.manager.handlers.MessageHandler;
import org.apache.gossip.model.PerNodeDataMessage;
import org.apache.gossip.model.SharedDataMessage;
@@ -348,4 +350,20 @@
return new File(manager.getSettings().getPathToDataState(), "pernodedata."
+ manager.getMyself().getClusterName() + "." + manager.getMyself().getId() + ".json");
}
+
+ public void registerPerNodeDataSubscriber(UpdateNodeDataEventHandler handler){
+ gossipCore.registerPerNodeDataSubscriber(handler);
+ }
+
+ public void registerSharedDataSubscriber(UpdateSharedDataEventHandler handler){
+ gossipCore.registerSharedDataSubscriber(handler);
+ }
+
+ public void unregisterPerNodeDataSubscriber(UpdateNodeDataEventHandler handler){
+ gossipCore.unregisterPerNodeDataSubscriber(handler);
+ }
+
+ public void unregisterSharedDataSubscriber(UpdateSharedDataEventHandler handler){
+ gossipCore.unregisterSharedDataSubscriber(handler);
+ }
}
diff --git a/gossip-base/src/test/java/org/apache/gossip/event/data/DataEventManagerTest.java b/gossip-base/src/test/java/org/apache/gossip/event/data/DataEventManagerTest.java
new file mode 100644
index 0000000..d9d778f
--- /dev/null
+++ b/gossip-base/src/test/java/org/apache/gossip/event/data/DataEventManagerTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.event.data;
+
+import com.codahale.metrics.MetricRegistry;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.platform.runner.JUnitPlatform;
+import org.junit.runner.RunWith;
+
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+@RunWith(JUnitPlatform.class)
+public class DataEventManagerTest {
+
+ private static Semaphore semaphore;
+ private String receivedNodeId;
+ private String receivedKey;
+ private Object receivedNewValue;
+ private Object receivedOldValue;
+
+ @BeforeClass
+ public static void setup() {
+ semaphore = new Semaphore(0);
+ }
+
+ @Test
+ public void perNodeDataEventHandlerAddRemoveTest() {
+ DataEventManager eventManager = new DataEventManager(new MetricRegistry());
+
+ UpdateNodeDataEventHandler nodeDataEventHandler = (nodeId, key, oldValue, newValue) -> {
+ };
+
+ eventManager.registerPerNodeDataSubscriber(nodeDataEventHandler);
+ Assert.assertEquals(1, eventManager.getPerNodeSubscribersSize());
+ eventManager.unregisterPerNodeDataSubscriber(nodeDataEventHandler);
+ Assert.assertEquals(0, eventManager.getPerNodeSubscribersSize());
+ }
+
+ // Test whether the per node data events are fired for matching key
+ @Test
+ public void perNodeDataEventHandlerTest() throws InterruptedException {
+ DataEventManager eventManager = new DataEventManager(new MetricRegistry());
+ resetData();
+
+ // A new subscriber "Juliet" is like to notified when per node data change for the key "Romeo"
+ UpdateNodeDataEventHandler juliet = (nodeId, key, oldValue, newValue) -> {
+ if(!key.equals("Romeo")) return;
+ receivedNodeId = nodeId;
+ receivedKey = key;
+ receivedNewValue = newValue;
+ receivedOldValue = oldValue;
+ semaphore.release();
+ };
+ // Juliet register with eventManager
+ eventManager.registerPerNodeDataSubscriber(juliet);
+ // Romeo is going to sleep after having dinner
+ eventManager.notifyPerNodeData("Montague", "Romeo", "sleeping", "eating");
+
+ // Juliet should notified
+ semaphore.tryAcquire(2, TimeUnit.SECONDS);
+ Assert.assertEquals("Montague", receivedNodeId);
+ Assert.assertEquals("Romeo", receivedKey);
+ Assert.assertEquals("sleeping", receivedNewValue);
+ Assert.assertEquals("eating", receivedOldValue);
+
+ eventManager.unregisterPerNodeDataSubscriber(juliet);
+ }
+
+ @Test
+ public void sharedDataEventHandlerAddRemoveTest() {
+ DataEventManager eventManager = new DataEventManager(new MetricRegistry());
+
+ UpdateSharedDataEventHandler sharedDataEventHandler = (key, oldValue, newValue) -> {
+
+ };
+ eventManager.registerSharedDataSubscriber(sharedDataEventHandler);
+ Assert.assertEquals(1, eventManager.getSharedDataSubscribersSize());
+ eventManager.unregisterSharedDataSubscriber(sharedDataEventHandler);
+ Assert.assertEquals(0, eventManager.getSharedDataSubscribersSize());
+
+ }
+
+ // Test whether the shared data events are fired
+ @Test
+ public void sharedDataEventHandlerTest() throws InterruptedException {
+ DataEventManager eventManager = new DataEventManager(new MetricRegistry());
+ resetData();
+
+ // A new subscriber "Alice" is like to notified when shared data change for the key "technology"
+ UpdateSharedDataEventHandler alice = (key, oldValue, newValue) -> {
+ if(!key.equals("technology")) return;
+ receivedKey = key;
+ receivedNewValue = newValue;
+ receivedOldValue = oldValue;
+ semaphore.release();
+ };
+ // Alice register with eventManager
+ eventManager.registerSharedDataSubscriber(alice);
+
+ // technology key get changed
+ eventManager.notifySharedData("technology", "Java has lambda", "Java is fast");
+
+ // Alice should notified
+ semaphore.tryAcquire(2, TimeUnit.SECONDS);
+ Assert.assertEquals("technology", receivedKey);
+ Assert.assertEquals("Java has lambda", receivedNewValue);
+ Assert.assertEquals("Java is fast", receivedOldValue);
+
+ eventManager.unregisterSharedDataSubscriber(alice);
+ }
+
+ // Test the MetricRegistry
+ @Test
+ public void metricRegistryTest() {
+ MetricRegistry registry = new MetricRegistry();
+
+ DataEventManager eventManager = new DataEventManager(registry);
+
+ UpdateNodeDataEventHandler nodeDataEventHandler = (nodeId, key, oldValue, newValue) -> {
+ };
+
+ UpdateSharedDataEventHandler sharedDataEventHandler = (key, oldValue, newValue) -> {
+ };
+
+ eventManager.registerPerNodeDataSubscriber(nodeDataEventHandler);
+ eventManager.registerSharedDataSubscriber(sharedDataEventHandler);
+
+ Assert.assertEquals(1,
+ registry.getGauges().get(DataEventConstants.PER_NODE_DATA_SUBSCRIBERS_SIZE).getValue());
+ Assert.assertEquals(0,
+ registry.getGauges().get(DataEventConstants.PER_NODE_DATA_SUBSCRIBERS_QUEUE_SIZE)
+ .getValue());
+ Assert.assertEquals(1,
+ registry.getGauges().get(DataEventConstants.SHARED_DATA_SUBSCRIBERS_SIZE).getValue());
+ Assert.assertEquals(0,
+ registry.getGauges().get(DataEventConstants.SHARED_DATA_SUBSCRIBERS_QUEUE_SIZE)
+ .getValue());
+
+ }
+
+ private void resetData() {
+ receivedNodeId = null;
+ receivedKey = null;
+ receivedNewValue = null;
+ receivedOldValue = null;
+ }
+
+}
diff --git a/gossip-examples/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java b/gossip-examples/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java
index 7d4db93..78c7782 100644
--- a/gossip-examples/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java
+++ b/gossip-examples/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java
@@ -79,10 +79,21 @@
} else if (op == 'g') {
gcount(val, gossipService);
}
+ if (op == 'l') {
+ listen(val, gossipService);
+ }
}
}
}
-
+
+ private static void listen(String val, GossipManager gossipManager) {
+ gossipManager.registerSharedDataSubscriber((key, oldValue, newValue) -> {
+ if (key.equals(val)) {
+ System.out.println("Event Handler fired! " + oldValue + " " + newValue);
+ }
+ });
+ }
+
private static void gcount(String val, GossipManager gossipManager) {
GrowOnlyCounter c = (GrowOnlyCounter) gossipManager.findCrdt("def");
Long l = Long.valueOf(val);
diff --git a/gossip-itest/src/test/java/org/apache/gossip/PerNodeDataEventTest.java b/gossip-itest/src/test/java/org/apache/gossip/PerNodeDataEventTest.java
new file mode 100644
index 0000000..59136d1
--- /dev/null
+++ b/gossip-itest/src/test/java/org/apache/gossip/PerNodeDataEventTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip;
+
+import io.teknek.tunit.TUnit;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.manager.GossipManagerBuilder;
+import org.apache.gossip.model.PerNodeDataMessage;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.platform.runner.JUnitPlatform;
+import org.junit.runner.RunWith;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+@RunWith(JUnitPlatform.class)
+public class PerNodeDataEventTest extends AbstractIntegrationBase {
+
+ private String receivedKey = "";
+ private String receivingNodeId = "";
+ private Object receivingNodeDataNewValue = "";
+ private Object receivingNodeDataOldValue = "";
+ private Semaphore lock = new Semaphore(0);
+
+
+ @Test
+ public void perNodeDataEventTest()
+ throws InterruptedException, UnknownHostException, URISyntaxException {
+ GossipSettings settings = new GossipSettings();
+ settings.setPersistRingState(false);
+ settings.setPersistDataState(false);
+ String cluster = UUID.randomUUID().toString();
+ int seedNodes = 1;
+ List<Member> startupMembers = new ArrayList<>();
+ for (int i = 1; i < seedNodes + 1; ++i) {
+ URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+ startupMembers.add(new RemoteMember(cluster, uri, i + ""));
+ }
+ final List<GossipManager> clients = new ArrayList<>();
+ final int clusterMembers = 2;
+ for (int i = 1; i < clusterMembers + 1; ++i) {
+ URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+ GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri)
+ .id(i + "").gossipMembers(startupMembers).gossipSettings(settings).build();
+ clients.add(gossipService);
+ gossipService.init();
+ register(gossipService);
+ }
+
+ // check whether the members are discovered
+ TUnit.assertThat(() -> {
+ int total = 0;
+ for (int i = 0; i < clusterMembers; ++i) {
+ total += clients.get(i).getLiveMembers().size();
+ }
+ return total;
+ }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2);
+
+ // Adding new data to Node 1
+ clients.get(0).gossipPerNodeData(getPerNodeData("category", "distributed"));
+
+ // Node 2 is interested in data changes for the key "organization" and "category"
+ clients.get(1).registerPerNodeDataSubscriber((nodeId, key, oldValue, newValue) -> {
+ if (!key.equals("organization") && !key.equals("category")) return;
+ receivingNodeId = nodeId;
+ receivedKey = key;
+ receivingNodeDataOldValue = oldValue;
+ receivingNodeDataNewValue = newValue;
+ lock.release();
+ });
+
+ // Node 2 first time adds Node 1 data
+ lock.tryAcquire(10, TimeUnit.SECONDS);
+ Assert.assertEquals("1", receivingNodeId);
+ Assert.assertEquals("category", receivedKey);
+ Assert.assertEquals(null, receivingNodeDataOldValue);
+ Assert.assertEquals("distributed", receivingNodeDataNewValue);
+
+ // Node 1 adds new per node data
+ clients.get(0).gossipPerNodeData(getPerNodeData("organization", "apache"));
+ // Node 2 adds new data key from Node 1
+ lock.tryAcquire(10, TimeUnit.SECONDS);
+ Assert.assertEquals("1", receivingNodeId);
+ Assert.assertEquals("organization", receivedKey);
+ Assert.assertEquals(null, receivingNodeDataOldValue);
+ Assert.assertEquals("apache", receivingNodeDataNewValue);
+
+ // Node 1 updates its value
+ clients.get(0).gossipPerNodeData(getPerNodeData("organization", "apache-gossip"));
+ // Node 2 updates existing value
+ lock.tryAcquire(10, TimeUnit.SECONDS);
+ Assert.assertEquals("1", receivingNodeId);
+ Assert.assertEquals("organization", receivedKey);
+ Assert.assertEquals("apache", receivingNodeDataOldValue);
+ Assert.assertEquals("apache-gossip", receivingNodeDataNewValue);
+
+ }
+
+ private PerNodeDataMessage getPerNodeData(String key, String value) {
+ PerNodeDataMessage g = new PerNodeDataMessage();
+ g.setExpireAt(Long.MAX_VALUE);
+ g.setKey(key);
+ g.setPayload(value);
+ g.setTimestamp(System.currentTimeMillis());
+ return g;
+ }
+
+}
diff --git a/gossip-itest/src/test/java/org/apache/gossip/SharedDataEventTest.java b/gossip-itest/src/test/java/org/apache/gossip/SharedDataEventTest.java
new file mode 100644
index 0000000..56f1657
--- /dev/null
+++ b/gossip-itest/src/test/java/org/apache/gossip/SharedDataEventTest.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip;
+
+import io.teknek.tunit.TUnit;
+import org.apache.gossip.crdt.GrowOnlyCounter;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.manager.GossipManagerBuilder;
+import org.apache.gossip.model.SharedDataMessage;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.platform.runner.JUnitPlatform;
+import org.junit.runner.RunWith;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+@RunWith(JUnitPlatform.class)
+public class SharedDataEventTest extends AbstractIntegrationBase {
+
+ private String receivedKey = "";
+ private Object receivingNodeDataNewValue = "";
+ private Object receivingNodeDataOldValue = "";
+ private String gCounterKey = "gCounter";
+ private Semaphore lock = new Semaphore(0);
+
+ @Test
+ public void sharedDataEventTest()
+ throws InterruptedException, UnknownHostException, URISyntaxException {
+ GossipSettings settings = new GossipSettings();
+ settings.setPersistRingState(false);
+ settings.setPersistDataState(false);
+ String cluster = UUID.randomUUID().toString();
+ int seedNodes = 1;
+ List<Member> startupMembers = new ArrayList<>();
+ for (int i = 1; i < seedNodes + 1; ++i) {
+ URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+ startupMembers.add(new RemoteMember(cluster, uri, i + ""));
+ }
+ final List<GossipManager> clients = new ArrayList<>();
+ final int clusterMembers = 2;
+ for (int i = 1; i < clusterMembers + 1; ++i) {
+ URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+ GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri)
+ .id(i + "").gossipMembers(startupMembers).gossipSettings(settings).build();
+ clients.add(gossipService);
+ gossipService.init();
+ register(gossipService);
+ }
+
+ // check whether the members are discovered
+ TUnit.assertThat(() -> {
+ int total = 0;
+ for (int i = 0; i < clusterMembers; ++i) {
+ total += clients.get(i).getLiveMembers().size();
+ }
+ return total;
+ }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2);
+
+ // Adding new data to Node 1
+ clients.get(0).gossipSharedData(sharedNodeData("category", "distributed"));
+
+ // Node 2 is interested in data changes for the key "organization" and "category"
+ clients.get(1).registerSharedDataSubscriber((key, oldValue, newValue) -> {
+ if (!key.equals("organization") && !key.equals("category"))
+ return;
+ receivedKey = key;
+ receivingNodeDataOldValue = oldValue;
+ receivingNodeDataNewValue = newValue;
+ lock.release();
+ });
+
+ // Node 2 first time gets shared data
+ lock.tryAcquire(10, TimeUnit.SECONDS);
+ Assert.assertEquals("category", receivedKey);
+ Assert.assertEquals(null, receivingNodeDataOldValue);
+ Assert.assertEquals("distributed", receivingNodeDataNewValue);
+
+ // Node 1 adds new per node data
+ clients.get(0).gossipSharedData(sharedNodeData("organization", "apache"));
+ // Node 2 adds new shared data
+ lock.tryAcquire(10, TimeUnit.SECONDS);
+ Assert.assertEquals("organization", receivedKey);
+ Assert.assertEquals(null, receivingNodeDataOldValue);
+ Assert.assertEquals("apache", receivingNodeDataNewValue);
+
+ // Node 1 updates its value
+ clients.get(0).gossipSharedData(sharedNodeData("organization", "apache-gossip"));
+
+ // Node 2 updates existing value
+ lock.tryAcquire(10, TimeUnit.SECONDS);
+ Assert.assertEquals("organization", receivedKey);
+ Assert.assertEquals("apache", receivingNodeDataOldValue);
+ Assert.assertEquals("apache-gossip", receivingNodeDataNewValue);
+
+ }
+
+ @Test
+ public void CrdtDataChangeEventTest()
+ throws InterruptedException, UnknownHostException, URISyntaxException {
+ GossipSettings settings = new GossipSettings();
+ settings.setPersistRingState(false);
+ settings.setPersistDataState(false);
+ String cluster = UUID.randomUUID().toString();
+ int seedNodes = 1;
+ List<Member> startupMembers = new ArrayList<>();
+ for (int i = 1; i < seedNodes + 1; ++i) {
+ URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+ startupMembers.add(new RemoteMember(cluster, uri, i + ""));
+ }
+ final List<GossipManager> clients = new ArrayList<>();
+ final int clusterMembers = 3;
+ for (int i = 1; i < clusterMembers + 1; ++i) {
+ URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+ GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri)
+ .id(i + "").gossipMembers(startupMembers).gossipSettings(settings).build();
+ clients.add(gossipService);
+ gossipService.init();
+ register(gossipService);
+ }
+
+ // check whether the members are discovered
+ TUnit.assertThat(() -> {
+ int total = 0;
+ for (int i = 0; i < clusterMembers; ++i) {
+ total += clients.get(i).getLiveMembers().size();
+ }
+ return total;
+ }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2);
+
+ clients.get(1).registerSharedDataSubscriber((key, oldValue, newValue) -> {
+ receivedKey = key;
+ receivingNodeDataOldValue = oldValue;
+ receivingNodeDataNewValue = newValue;
+ lock.release();
+ });
+
+ // Add initial gCounter to Node 1
+ SharedDataMessage d = new SharedDataMessage();
+ d.setKey(gCounterKey);
+ d.setPayload(new GrowOnlyCounter(new GrowOnlyCounter.Builder(clients.get(0)).increment(1L)));
+ d.setExpireAt(Long.MAX_VALUE);
+ d.setTimestamp(System.currentTimeMillis());
+ clients.get(0).merge(d);
+
+ // Check if initial Crdt received
+ lock.tryAcquire(10, TimeUnit.SECONDS);
+ Assert.assertEquals("gCounter", receivedKey);
+ Assert.assertEquals(null, receivingNodeDataOldValue);
+ Assert.assertTrue(receivingNodeDataNewValue instanceof GrowOnlyCounter);
+ Assert.assertEquals(1, ((GrowOnlyCounter) receivingNodeDataNewValue).value().longValue());
+
+ // Node 3 Updates the gCounter by 4
+ GrowOnlyCounter gc = (GrowOnlyCounter) clients.get(2).findCrdt(gCounterKey);
+ GrowOnlyCounter gcNew = new GrowOnlyCounter(gc,
+ new GrowOnlyCounter.Builder(clients.get(2)).increment(4L));
+
+ d = new SharedDataMessage();
+ d.setKey(gCounterKey);
+ d.setPayload(gcNew);
+ d.setExpireAt(Long.MAX_VALUE);
+ d.setTimestamp(System.currentTimeMillis());
+ clients.get(2).merge(d);
+
+ // Check if Node 3's Crdt update is received in Node 2 event handler
+ lock.tryAcquire(10, TimeUnit.SECONDS);
+ Assert.assertEquals("gCounter", receivedKey);
+ Assert.assertTrue(receivingNodeDataOldValue instanceof GrowOnlyCounter);
+ Assert.assertEquals(1, ((GrowOnlyCounter) receivingNodeDataOldValue).value().longValue());
+ Assert.assertTrue(receivingNodeDataNewValue instanceof GrowOnlyCounter);
+ Assert.assertEquals(5, ((GrowOnlyCounter) receivingNodeDataNewValue).value().longValue());
+
+ }
+
+ private SharedDataMessage sharedNodeData(String key, String value) {
+ SharedDataMessage g = new SharedDataMessage();
+ g.setExpireAt(Long.MAX_VALUE);
+ g.setKey(key);
+ g.setPayload(value);
+ g.setTimestamp(System.currentTimeMillis());
+ return g;
+ }
+
+}