GOSSIP-41 Transfer gossip data in bulk
diff --git a/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java b/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java
index 32c00c9..2ba9110 100644
--- a/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java
+++ b/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java
@@ -61,6 +61,10 @@
private String pathToKeyStore = "./keys";
private boolean signMessages = false;
+
+ private boolean bulkTransfer = false;
+
+ private int bulkTransferSize = StartupSettings.DEFAULT_BULK_TRANSFER_SIZE;
/**
@@ -77,14 +81,15 @@
* @param cleanupInterval
* The cleanup interval in ms.
*/
- public GossipSettings(int gossipInterval, int cleanupInterval, int windowSize,
- int minimumSamples, double convictThreshold, String distribution) {
+ public GossipSettings(int gossipInterval, int cleanupInterval, int windowSize, int minimumSamples,
+ double convictThreshold, String distribution, boolean bulkTransfer) {
this.gossipInterval = gossipInterval;
this.cleanupInterval = cleanupInterval;
this.windowSize = windowSize;
this.minimumSamples = minimumSamples;
this.convictThreshold = convictThreshold;
this.distribution = distribution;
+ this.bulkTransfer = bulkTransfer;
}
/**
@@ -242,4 +247,19 @@
this.protocolManagerClass = protocolManagerClass;
}
+ public boolean isBulkTransfer() {
+ return bulkTransfer;
+ }
+
+ public void setBulkTransfer(boolean bulkTransfer) {
+ this.bulkTransfer = bulkTransfer;
+ }
+
+ public int getBulkTransferSize() {
+ return bulkTransferSize;
+ }
+
+ public void setBulkTransferSize(int bulkTransferSize) {
+ this.bulkTransferSize = bulkTransferSize;
+ }
}
diff --git a/gossip-base/src/main/java/org/apache/gossip/StartupSettings.java b/gossip-base/src/main/java/org/apache/gossip/StartupSettings.java
index dd30e88..23608f2 100644
--- a/gossip-base/src/main/java/org/apache/gossip/StartupSettings.java
+++ b/gossip-base/src/main/java/org/apache/gossip/StartupSettings.java
@@ -55,6 +55,10 @@
/** The list with gossip members to start with. */
private final List<Member> gossipMembers;
+ /** Default setting values */
+ private static final boolean DEFAULT_BULK_TRANSFER = false;
+ public static final int DEFAULT_BULK_TRANSFER_SIZE = 100;
+
/**
* Constructor.
*
@@ -175,6 +179,7 @@
properties.put(i.getKey(), i.getValue().asText());
}
//TODO constants as defaults?
+ // TODO setting keys as constants?
int gossipInterval = jsonObject.get("gossip_interval").intValue();
int cleanupInterval = jsonObject.get("cleanup_interval").intValue();
int windowSize = jsonObject.get("window_size").intValue();
@@ -182,6 +187,12 @@
double convictThreshold = jsonObject.get("convict_threshold").asDouble();
String cluster = jsonObject.get("cluster").textValue();
String distribution = jsonObject.get("distribution").textValue();
+ boolean bulkTransfer = jsonObject.has("bulk_transfer") ?
+ jsonObject.get("bulk_transfer").booleanValue() :
+ DEFAULT_BULK_TRANSFER;
+ int bulkTransferSize = jsonObject.has("bulk_transfer_size") ?
+ jsonObject.get("bulk_transfer_size").intValue() :
+ DEFAULT_BULK_TRANSFER_SIZE;
if (cluster == null){
throw new IllegalArgumentException("cluster was null. It is required");
}
@@ -192,8 +203,9 @@
jsonObject.get("protocol_manager_class").textValue() :
null;
URI uri2 = new URI(uri);
- GossipSettings gossipSettings = new GossipSettings(gossipInterval, cleanupInterval, windowSize, minSamples,
- convictThreshold, distribution);
+ GossipSettings gossipSettings = new GossipSettings(gossipInterval, cleanupInterval, windowSize,
+ minSamples, convictThreshold, distribution, bulkTransfer);
+ gossipSettings.setBulkTransferSize(bulkTransferSize);
if (transportClass != null) {
gossipSettings.setTransportManagerClass(transportClass);
}
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java b/gossip-base/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java
index adf2530..4bd44f2 100644
--- a/gossip-base/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java
@@ -24,6 +24,7 @@
import java.util.concurrent.ConcurrentHashMap;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
+import org.apache.gossip.GossipSettings;
import org.apache.gossip.LocalMember;
import org.apache.gossip.model.ActiveGossipOk;
import org.apache.gossip.model.PerNodeDataMessage;
@@ -31,40 +32,40 @@
import org.apache.gossip.model.Response;
import org.apache.gossip.model.SharedDataMessage;
import org.apache.gossip.model.ShutdownMessage;
-import org.apache.gossip.udp.UdpActiveGossipMessage;
-import org.apache.gossip.udp.UdpPerNodeDataMessage;
-import org.apache.gossip.udp.UdpSharedDataMessage;
+import org.apache.gossip.udp.*;
import org.apache.log4j.Logger;
import static com.codahale.metrics.MetricRegistry.name;
/**
- * The ActiveGossipThread is sends information. Pick a random partner and send the membership list to that partner
+ * The ActiveGossipThread sends information. Pick a random partner and send the membership list to that partner
*/
public abstract class AbstractActiveGossiper {
protected static final Logger LOGGER = Logger.getLogger(AbstractActiveGossiper.class);
-
+
protected final GossipManager gossipManager;
protected final GossipCore gossipCore;
private final Histogram sharedDataHistogram;
private final Histogram sendPerNodeDataHistogram;
- private final Histogram sendMembershipHistorgram;
+ private final Histogram sendMembershipHistogram;
private final Random random;
+ private final GossipSettings gossipSettings;
public AbstractActiveGossiper(GossipManager gossipManager, GossipCore gossipCore, MetricRegistry registry) {
this.gossipManager = gossipManager;
this.gossipCore = gossipCore;
sharedDataHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sharedDataHistogram-time"));
sendPerNodeDataHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sendPerNodeDataHistogram-time"));
- sendMembershipHistorgram = registry.histogram(name(AbstractActiveGossiper.class, "sendMembershipHistorgram-time"));
+ sendMembershipHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sendMembershipHistogram-time"));
random = new Random();
+ gossipSettings = gossipManager.getSettings();
}
public void init() {
}
-
+
public void shutdown() {
}
@@ -78,12 +79,22 @@
m.setShutdownAtNanos(gossipManager.getClock().nanoTime());
gossipCore.sendOneWay(m, target.getUri());
}
-
- public final void sendSharedData(LocalMember me, LocalMember member){
- if (member == null){
+
+ public final void sendSharedData(LocalMember me, LocalMember member) {
+ if (member == null) {
return;
}
long startTime = System.currentTimeMillis();
+ if (gossipSettings.isBulkTransfer()) {
+ sendSharedDataInBulkInternal(me, member);
+ } else {
+ sendSharedDataInternal(me, member);
+ }
+ sharedDataHistogram.update(System.currentTimeMillis() - startTime);
+ }
+
+ /** Send shared data one entry at a time. */
+ private void sendSharedDataInternal(LocalMember me, LocalMember member) {
for (Entry<String, SharedDataMessage> innerEntry : gossipCore.getSharedData().entrySet()){
if (innerEntry.getValue().getReplicable() != null && !innerEntry.getValue().getReplicable()
.shouldReplicate(me, member, innerEntry.getValue())) {
@@ -92,22 +103,60 @@
UdpSharedDataMessage message = new UdpSharedDataMessage();
message.setUuid(UUID.randomUUID().toString());
message.setUriFrom(me.getId());
- message.setExpireAt(innerEntry.getValue().getExpireAt());
- message.setKey(innerEntry.getValue().getKey());
- message.setNodeId(innerEntry.getValue().getNodeId());
- message.setTimestamp(innerEntry.getValue().getTimestamp());
- message.setPayload(innerEntry.getValue().getPayload());
- message.setReplicable(innerEntry.getValue().getReplicable());
+ copySharedDataMessage(innerEntry.getValue(), message);
gossipCore.sendOneWay(message, member.getUri());
}
- sharedDataHistogram.update(System.currentTimeMillis() - startTime);
}
-
+
+ /** Send shared data by batching together several entries. */
+ private void sendSharedDataInBulkInternal(LocalMember me, LocalMember member) {
+ UdpSharedDataBulkMessage udpMessage = new UdpSharedDataBulkMessage();
+ udpMessage.setUuid(UUID.randomUUID().toString());
+ udpMessage.setUriFrom(me.getId());
+ for (Entry<String, SharedDataMessage> innerEntry : gossipCore.getSharedData().entrySet()) {
+ if (innerEntry.getValue().getReplicable() != null && !innerEntry.getValue().getReplicable()
+ .shouldReplicate(me, member, innerEntry.getValue())) {
+ continue;
+ }
+ SharedDataMessage message = new SharedDataMessage();
+ copySharedDataMessage(innerEntry.getValue(), message);
+ udpMessage.addMessage(message);
+ if (udpMessage.getMessages().size() == gossipSettings.getBulkTransferSize()) {
+ gossipCore.sendOneWay(udpMessage, member.getUri());
+ udpMessage = new UdpSharedDataBulkMessage();
+ udpMessage.setUuid(UUID.randomUUID().toString());
+ udpMessage.setUriFrom(me.getId());
+ }
+ }
+ if (udpMessage.getMessages().size() > 0) {
+ gossipCore.sendOneWay(udpMessage, member.getUri());
+ }
+ }
+
+ private void copySharedDataMessage(SharedDataMessage original, SharedDataMessage copy) {
+ copy.setExpireAt(original.getExpireAt());
+ copy.setKey(original.getKey());
+ copy.setNodeId(original.getNodeId());
+ copy.setTimestamp(original.getTimestamp());
+ copy.setPayload(original.getPayload());
+ copy.setReplicable(original.getReplicable());
+ }
+
public final void sendPerNodeData(LocalMember me, LocalMember member){
if (member == null){
return;
}
long startTime = System.currentTimeMillis();
+ if (gossipSettings.isBulkTransfer()) {
+ sendPerNodeDataInBulkInternal(me, member);
+ } else {
+ sendPerNodeDataInternal(me, member);
+ }
+ sendPerNodeDataHistogram.update(System.currentTimeMillis() - startTime);
+ }
+
+ /** Send per node data one entry at a time. */
+ private void sendPerNodeDataInternal(LocalMember me, LocalMember member) {
for (Entry<String, ConcurrentHashMap<String, PerNodeDataMessage>> entry : gossipCore.getPerNodeData().entrySet()){
for (Entry<String, PerNodeDataMessage> innerEntry : entry.getValue().entrySet()){
if (innerEntry.getValue().getReplicable() != null && !innerEntry.getValue().getReplicable()
@@ -117,18 +166,49 @@
UdpPerNodeDataMessage message = new UdpPerNodeDataMessage();
message.setUuid(UUID.randomUUID().toString());
message.setUriFrom(me.getId());
- message.setExpireAt(innerEntry.getValue().getExpireAt());
- message.setKey(innerEntry.getValue().getKey());
- message.setNodeId(innerEntry.getValue().getNodeId());
- message.setTimestamp(innerEntry.getValue().getTimestamp());
- message.setPayload(innerEntry.getValue().getPayload());
- message.setReplicable(innerEntry.getValue().getReplicable());
- gossipCore.sendOneWay(message, member.getUri());
+ copyPerNodeDataMessage(innerEntry.getValue(), message);
+ gossipCore.sendOneWay(message, member.getUri());
}
}
- sendPerNodeDataHistogram.update(System.currentTimeMillis() - startTime);
+
}
-
+
+ /** Send per node data by batching together several entries. */
+ private void sendPerNodeDataInBulkInternal(LocalMember me, LocalMember member) {
+ for (Entry<String, ConcurrentHashMap<String, PerNodeDataMessage>> entry : gossipCore.getPerNodeData().entrySet()){
+ UdpPerNodeDataBulkMessage udpMessage = new UdpPerNodeDataBulkMessage();
+ udpMessage.setUuid(UUID.randomUUID().toString());
+ udpMessage.setUriFrom(me.getId());
+ for (Entry<String, PerNodeDataMessage> innerEntry : entry.getValue().entrySet()){
+ if (innerEntry.getValue().getReplicable() != null && !innerEntry.getValue().getReplicable()
+ .shouldReplicate(me, member, innerEntry.getValue())) {
+ continue;
+ }
+ PerNodeDataMessage message = new PerNodeDataMessage();
+ copyPerNodeDataMessage(innerEntry.getValue(), message);
+ udpMessage.addMessage(message);
+ if (udpMessage.getMessages().size() == gossipSettings.getBulkTransferSize()) {
+ gossipCore.sendOneWay(udpMessage, member.getUri());
+ udpMessage = new UdpPerNodeDataBulkMessage();
+ udpMessage.setUuid(UUID.randomUUID().toString());
+ udpMessage.setUriFrom(me.getId());
+ }
+ }
+ if (udpMessage.getMessages().size() > 0) {
+ gossipCore.sendOneWay(udpMessage, member.getUri());
+ }
+ }
+ }
+
+ private void copyPerNodeDataMessage(PerNodeDataMessage original, PerNodeDataMessage copy) {
+ copy.setExpireAt(original.getExpireAt());
+ copy.setKey(original.getKey());
+ copy.setNodeId(original.getNodeId());
+ copy.setTimestamp(original.getTimestamp());
+ copy.setPayload(original.getPayload());
+ copy.setReplicable(original.getReplicable());
+ }
+
/**
* Performs the sending of the membership list, after we have incremented our own heartbeat.
*/
@@ -151,9 +231,9 @@
} else {
LOGGER.debug("Message " + message + " generated response " + r);
}
- sendMembershipHistorgram.update(System.currentTimeMillis() - startTime);
+ sendMembershipHistogram.update(System.currentTimeMillis() - startTime);
}
-
+
protected final Member convert(LocalMember member){
Member gm = new Member();
gm.setCluster(member.getClusterName());
@@ -163,9 +243,9 @@
gm.setProperties(member.getProperties());
return gm;
}
-
+
/**
- *
+ *
* @param memberList
* An immutable list
* @return The chosen LocalGossipMember to gossip with.
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageHandlerFactory.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageHandlerFactory.java
index fff9430..abce76d 100644
--- a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageHandlerFactory.java
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageHandlerFactory.java
@@ -20,37 +20,36 @@
import org.apache.gossip.manager.GossipCore;
import org.apache.gossip.manager.GossipManager;
-import org.apache.gossip.model.ActiveGossipMessage;
-import org.apache.gossip.model.Base;
-import org.apache.gossip.model.PerNodeDataMessage;
-import org.apache.gossip.model.Response;
-import org.apache.gossip.model.SharedDataMessage;
-import org.apache.gossip.model.ShutdownMessage;
+import org.apache.gossip.model.*;
import java.util.Arrays;
public class MessageHandlerFactory {
-
+
public static MessageHandler defaultHandler() {
return concurrentHandler(
new TypedMessageHandler(Response.class, new ResponseHandler()),
new TypedMessageHandler(ShutdownMessage.class, new ShutdownMessageHandler()),
new TypedMessageHandler(PerNodeDataMessage.class, new PerNodeDataMessageHandler()),
new TypedMessageHandler(SharedDataMessage.class, new SharedDataMessageHandler()),
- new TypedMessageHandler(ActiveGossipMessage.class, new ActiveGossipMessageHandler())
+ new TypedMessageHandler(ActiveGossipMessage.class, new ActiveGossipMessageHandler()),
+ new TypedMessageHandler(PerNodeDataBulkMessage.class, new PerNodeDataBulkMessageHandler()),
+ new TypedMessageHandler(SharedDataBulkMessage.class, new SharedDataBulkMessageHandler())
);
}
-
+
public static MessageHandler concurrentHandler(MessageHandler... handlers) {
- if (handlers == null) throw new NullPointerException("handlers cannot be null");
+ if (handlers == null)
+ throw new NullPointerException("handlers cannot be null");
if (Arrays.asList(handlers).stream().filter(i -> i != null).count() != handlers.length) {
throw new NullPointerException("found at least one null handler");
}
return new MessageHandler() {
- @Override
- public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
+ @Override public boolean invoke(GossipCore gossipCore, GossipManager gossipManager,
+ Base base) {
// return true if at least one of the component handlers return true.
- return Arrays.asList(handlers).stream().filter((mi) -> mi.invoke(gossipCore, gossipManager, base)).count() > 0;
+ return Arrays.asList(handlers).stream()
+ .filter((mi) -> mi.invoke(gossipCore, gossipManager, base)).count() > 0;
}
};
}
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/PerNodeDataBulkMessageHandler.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/PerNodeDataBulkMessageHandler.java
new file mode 100644
index 0000000..37024e9
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/PerNodeDataBulkMessageHandler.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager.handlers;
+
+import org.apache.gossip.manager.GossipCore;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.model.Base;
+import org.apache.gossip.model.PerNodeDataMessage;
+import org.apache.gossip.udp.UdpPerNodeDataBulkMessage;
+
+public class PerNodeDataBulkMessageHandler implements MessageHandler {
+
+ /**
+ * @param gossipCore context.
+ * @param gossipManager context.
+ * @param base message reference.
+ * @return boolean indicating success.
+ */
+ @Override
+ public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
+ UdpPerNodeDataBulkMessage udpMessage = (UdpPerNodeDataBulkMessage) base;
+ for (PerNodeDataMessage dataMsg: udpMessage.getMessages())
+ gossipCore.addPerNodeData(dataMsg);
+ return true;
+ }
+}
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/PerNodeDataMessageHandler.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/PerNodeDataMessageHandler.java
index 0ad0d91..4ac47b9 100644
--- a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/PerNodeDataMessageHandler.java
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/PerNodeDataMessageHandler.java
@@ -23,7 +23,7 @@
import org.apache.gossip.udp.UdpPerNodeDataMessage;
public class PerNodeDataMessageHandler implements MessageHandler {
-
+
/**
* @param gossipCore context.
* @param gossipManager context.
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/SharedDataBulkMessageHandler.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/SharedDataBulkMessageHandler.java
new file mode 100644
index 0000000..a062f95
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/SharedDataBulkMessageHandler.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager.handlers;
+
+import org.apache.gossip.manager.GossipCore;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.model.Base;
+import org.apache.gossip.model.SharedDataMessage;
+import org.apache.gossip.udp.UdpSharedDataBulkMessage;
+
+public class SharedDataBulkMessageHandler implements MessageHandler{
+
+ /**
+ * @param gossipCore context.
+ * @param gossipManager context.
+ * @param base message reference.
+ * @return boolean indicating success.
+ */
+ @Override
+ public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
+ UdpSharedDataBulkMessage udpMessage = (UdpSharedDataBulkMessage) base;
+ for (SharedDataMessage dataMsg: udpMessage.getMessages())
+ gossipCore.addSharedData(dataMsg);
+ return true;
+ }
+}
diff --git a/gossip-base/src/main/java/org/apache/gossip/model/Base.java b/gossip-base/src/main/java/org/apache/gossip/model/Base.java
index 1b66310..e9183b0 100644
--- a/gossip-base/src/main/java/org/apache/gossip/model/Base.java
+++ b/gossip-base/src/main/java/org/apache/gossip/model/Base.java
@@ -19,9 +19,9 @@
import org.apache.gossip.udp.UdpActiveGossipMessage;
import org.apache.gossip.udp.UdpActiveGossipOk;
-import org.apache.gossip.udp.UdpPerNodeDataMessage;
+import org.apache.gossip.udp.UdpPerNodeDataBulkMessage;
import org.apache.gossip.udp.UdpNotAMemberFault;
-import org.apache.gossip.udp.UdpSharedDataMessage;
+import org.apache.gossip.udp.UdpSharedDataBulkMessage;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.JsonSubTypes;
@@ -40,9 +40,9 @@
@Type(value = UdpActiveGossipMessage.class, name = "UdpActiveGossipMessage"),
@Type(value = UdpNotAMemberFault.class, name = "UdpNotAMemberFault"),
@Type(value = PerNodeDataMessage.class, name = "PerNodeDataMessage"),
- @Type(value = UdpPerNodeDataMessage.class, name = "UdpPerNodeDataMessage"),
+ @Type(value = UdpPerNodeDataBulkMessage.class, name = "UdpPerNodeDataMessage"),
@Type(value = SharedDataMessage.class, name = "SharedDataMessage"),
- @Type(value = UdpSharedDataMessage.class, name = "UdpSharedDataMessage")
+ @Type(value = UdpSharedDataBulkMessage.class, name = "UdpSharedDataMessage")
})
public class Base {
diff --git a/gossip-base/src/main/java/org/apache/gossip/model/PerNodeDataBulkMessage.java b/gossip-base/src/main/java/org/apache/gossip/model/PerNodeDataBulkMessage.java
new file mode 100644
index 0000000..bb138a5
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/model/PerNodeDataBulkMessage.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.model;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class PerNodeDataBulkMessage extends Base {
+ private List<PerNodeDataMessage> messages = new ArrayList<>();
+
+ public void addMessage(PerNodeDataMessage msg) {
+ messages.add(msg);
+ }
+
+ public List<PerNodeDataMessage> getMessages() {
+ return messages;
+ }
+
+ @Override public String toString() {
+ return "GossipDataBulkMessage[" + messages.stream().map(Object::toString)
+ .collect(Collectors.joining(",")) + "]";
+ }
+}
diff --git a/gossip-base/src/main/java/org/apache/gossip/model/SharedDataBulkMessage.java b/gossip-base/src/main/java/org/apache/gossip/model/SharedDataBulkMessage.java
new file mode 100644
index 0000000..7b67430
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/model/SharedDataBulkMessage.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.model;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class SharedDataBulkMessage extends Base {
+ private List<SharedDataMessage> messages = new ArrayList<>();
+
+ public void addMessage(SharedDataMessage msg) {
+ messages.add(msg);
+ }
+
+ public List<SharedDataMessage> getMessages() {
+ return messages;
+ }
+
+ @Override public String toString() {
+ return "SharedGossipDataBulkMessage[" + messages.stream().map(Object::toString)
+ .collect(Collectors.joining(",")) + "]";
+ }
+}
diff --git a/gossip-base/src/main/java/org/apache/gossip/model/SharedDataMessage.java b/gossip-base/src/main/java/org/apache/gossip/model/SharedDataMessage.java
index 4b1a1ea..e148189 100644
--- a/gossip-base/src/main/java/org/apache/gossip/model/SharedDataMessage.java
+++ b/gossip-base/src/main/java/org/apache/gossip/model/SharedDataMessage.java
@@ -28,7 +28,7 @@
private Long timestamp;
private Long expireAt;
private Replicable<SharedDataMessage> replicable;
-
+
public String getNodeId() {
return nodeId;
}
diff --git a/gossip-base/src/main/java/org/apache/gossip/udp/UdpPerNodeDataBulkMessage.java b/gossip-base/src/main/java/org/apache/gossip/udp/UdpPerNodeDataBulkMessage.java
new file mode 100644
index 0000000..99eb1e5
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/udp/UdpPerNodeDataBulkMessage.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.udp;
+
+import org.apache.gossip.model.PerNodeDataBulkMessage;
+
+public class UdpPerNodeDataBulkMessage extends PerNodeDataBulkMessage implements Trackable {
+
+ private String uriFrom;
+ private String uuid;
+
+ public String getUriFrom() {
+ return uriFrom;
+ }
+
+ public void setUriFrom(String uriFrom) {
+ this.uriFrom = uriFrom;
+ }
+
+ public String getUuid() {
+ return uuid;
+ }
+
+ public void setUuid(String uuid) {
+ this.uuid = uuid;
+ }
+
+ @Override
+ public String toString() {
+ return "UdpGossipDataMessage [uriFrom=" + uriFrom + ", uuid=" + uuid
+ + ", messages=[" + super.toString() + "] ]";
+ }
+
+}
diff --git a/gossip-base/src/main/java/org/apache/gossip/udp/UdpSharedDataBulkMessage.java b/gossip-base/src/main/java/org/apache/gossip/udp/UdpSharedDataBulkMessage.java
new file mode 100644
index 0000000..8dc8be1
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/udp/UdpSharedDataBulkMessage.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.udp;
+
+import org.apache.gossip.model.SharedDataBulkMessage;
+
+public class UdpSharedDataBulkMessage extends SharedDataBulkMessage implements Trackable {
+
+ private String uriFrom;
+ private String uuid;
+
+ public String getUriFrom() {
+ return uriFrom;
+ }
+
+ public void setUriFrom(String uriFrom) {
+ this.uriFrom = uriFrom;
+ }
+
+ public String getUuid() {
+ return uuid;
+ }
+
+ public void setUuid(String uuid) {
+ this.uuid = uuid;
+ }
+
+ @Override
+ public String toString() {
+ return "UdpSharedGossipDataMessage [uriFrom=" + uriFrom + ", uuid=" + uuid + ", getNodeId()="
+ + ", messages=[" + super.toString() + "] ]";
+ }
+
+}
diff --git a/gossip-itest/src/test/java/org/apache/gossip/PerNodeDataEventTest.java b/gossip-itest/src/test/java/org/apache/gossip/PerNodeDataEventTest.java
index 59136d1..cc269f9 100644
--- a/gossip-itest/src/test/java/org/apache/gossip/PerNodeDataEventTest.java
+++ b/gossip-itest/src/test/java/org/apache/gossip/PerNodeDataEventTest.java
@@ -23,19 +23,17 @@
import org.apache.gossip.model.PerNodeDataMessage;
import org.junit.Assert;
import org.junit.Test;
-import org.junit.platform.runner.JUnitPlatform;
import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
+import java.util.*;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
-@RunWith(JUnitPlatform.class)
+@RunWith(Parameterized.class)
public class PerNodeDataEventTest extends AbstractIntegrationBase {
private String receivedKey = "";
@@ -43,25 +41,39 @@
private Object receivingNodeDataNewValue = "";
private Object receivingNodeDataOldValue = "";
private Semaphore lock = new Semaphore(0);
-
-
+ private int base;
+ private boolean bulkTransfer;
+
+ public PerNodeDataEventTest(int base, boolean bulkTransfer) {
+ this.base = base;
+ this.bulkTransfer = bulkTransfer;
+ }
+
+ @Parameterized.Parameters(name = "{index} bulkTransfer={1}")
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][]{
+ {50000, false}, {55000, true}
+ });
+ }
+
@Test
public void perNodeDataEventTest()
throws InterruptedException, UnknownHostException, URISyntaxException {
GossipSettings settings = new GossipSettings();
settings.setPersistRingState(false);
settings.setPersistDataState(false);
+ settings.setBulkTransfer(bulkTransfer);
String cluster = UUID.randomUUID().toString();
int seedNodes = 1;
List<Member> startupMembers = new ArrayList<>();
for (int i = 1; i < seedNodes + 1; ++i) {
- URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+ URI uri = new URI("udp://" + "127.0.0.1" + ":" + (base + i));
startupMembers.add(new RemoteMember(cluster, uri, i + ""));
}
final List<GossipManager> clients = new ArrayList<>();
final int clusterMembers = 2;
for (int i = 1; i < clusterMembers + 1; ++i) {
- URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+ URI uri = new URI("udp://" + "127.0.0.1" + ":" + (base + i));
GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri)
.id(i + "").gossipMembers(startupMembers).gossipSettings(settings).build();
clients.add(gossipService);
diff --git a/gossip-itest/src/test/java/org/apache/gossip/SharedDataEventTest.java b/gossip-itest/src/test/java/org/apache/gossip/SharedDataEventTest.java
index 8dbfcb3..547cd45 100644
--- a/gossip-itest/src/test/java/org/apache/gossip/SharedDataEventTest.java
+++ b/gossip-itest/src/test/java/org/apache/gossip/SharedDataEventTest.java
@@ -24,19 +24,17 @@
import org.apache.gossip.model.SharedDataMessage;
import org.junit.Assert;
import org.junit.Test;
-import org.junit.platform.runner.JUnitPlatform;
import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
+import java.util.*;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
-@RunWith(JUnitPlatform.class)
+@RunWith(Parameterized.class)
public class SharedDataEventTest extends AbstractIntegrationBase {
private String receivedKey = "";
@@ -44,6 +42,20 @@
private Object receivingNodeDataOldValue = "";
private String gCounterKey = "gCounter";
private Semaphore lock = new Semaphore(0);
+ private int base;
+ private boolean bulkTransfer;
+
+ public SharedDataEventTest(int base, boolean bulkTransfer) {
+ this.base = base;
+ this.bulkTransfer = bulkTransfer;
+ }
+
+ @Parameterized.Parameters(name = "{index} bulkTransfer={1}")
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][]{
+ {50000, false}, {55000, true}
+ });
+ }
@Test
public void sharedDataEventTest()
@@ -51,17 +63,18 @@
GossipSettings settings = new GossipSettings();
settings.setPersistRingState(false);
settings.setPersistDataState(false);
+ settings.setBulkTransfer(bulkTransfer);
String cluster = UUID.randomUUID().toString();
int seedNodes = 1;
List<Member> startupMembers = new ArrayList<>();
for (int i = 1; i < seedNodes + 1; ++i) {
- URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+ URI uri = new URI("udp://" + "127.0.0.1" + ":" + (base + i));
startupMembers.add(new RemoteMember(cluster, uri, i + ""));
}
final List<GossipManager> clients = new ArrayList<>();
final int clusterMembers = 2;
for (int i = 1; i < clusterMembers + 1; ++i) {
- URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+ URI uri = new URI("udp://" + "127.0.0.1" + ":" + (base + i));
GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri)
.id(i + "").gossipMembers(startupMembers).gossipSettings(settings).build();
clients.add(gossipService);
@@ -126,13 +139,13 @@
int seedNodes = 1;
List<Member> startupMembers = new ArrayList<>();
for (int i = 1; i < seedNodes + 1; ++i) {
- URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+ URI uri = new URI("udp://" + "127.0.0.1" + ":" + (base + i));
startupMembers.add(new RemoteMember(cluster, uri, i + ""));
}
final List<GossipManager> clients = new ArrayList<>();
final int clusterMembers = 3;
for (int i = 1; i < clusterMembers + 1; ++i) {
- URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+ URI uri = new URI("udp://" + "127.0.0.1" + ":" + (base + i));
GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri)
.id(i + "").gossipMembers(startupMembers).gossipSettings(settings).build();
clients.add(gossipService);
diff --git a/gossip-itest/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java b/gossip-itest/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
index dd5bfe9..e5c3639 100644
--- a/gossip-itest/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
+++ b/gossip-itest/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
@@ -49,7 +49,7 @@
@Test
public void DeadNodesDoNotComeAliveAgain()
throws InterruptedException, UnknownHostException, URISyntaxException {
- GossipSettings settings = new GossipSettings(100, 10000, 1000, 1, 10.0, "normal");
+ GossipSettings settings = new GossipSettings(100, 10000, 1000, 1, 10.0, "normal", false);
settings.setPersistRingState(false);
settings.setPersistDataState(false);
String cluster = UUID.randomUUID().toString();
diff --git a/gossip-itest/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java b/gossip-itest/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
index 8ae783e..eb9abcd 100644
--- a/gossip-itest/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
+++ b/gossip-itest/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
@@ -47,7 +47,7 @@
}
public void abc(int base) throws InterruptedException, UnknownHostException, URISyntaxException {
- GossipSettings settings = new GossipSettings(1000, 10000, 1000, 1, 1.6, "exponential");
+ GossipSettings settings = new GossipSettings(1000, 10000, 1000, 1, 1.6, "exponential", false);
settings.setPersistRingState(false);
settings.setPersistDataState(false);
String cluster = UUID.randomUUID().toString();