Small merge conflict
diff --git a/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java b/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java
index 4ea0ab6..2fe8a0c 100644
--- a/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java
+++ b/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java
@@ -68,6 +68,10 @@
   private LockManagerSettings lockManagerSettings = LockManagerSettings
           .getLockManagerDefaultSettings();
   
+  private boolean bulkTransfer = false;
+
+  private int bulkTransferSize = StartupSettings.DEFAULT_BULK_TRANSFER_SIZE;
+  
   /**
    * Construct GossipSettings with default settings.
    */
@@ -82,14 +86,15 @@
    * @param cleanupInterval
    *          The cleanup interval in ms.
    */
-  public GossipSettings(int gossipInterval, int cleanupInterval, int windowSize, 
-          int minimumSamples, double convictThreshold, String distribution) {
+  public GossipSettings(int gossipInterval, int cleanupInterval, int windowSize, int minimumSamples,
+          double convictThreshold, String distribution, boolean bulkTransfer) {
     this.gossipInterval = gossipInterval;
     this.cleanupInterval = cleanupInterval;
     this.windowSize = windowSize;
     this.minimumSamples = minimumSamples;
     this.convictThreshold = convictThreshold;
     this.distribution = distribution;
+    this.bulkTransfer = bulkTransfer;
   }
 
   /**
@@ -258,4 +263,20 @@
   public void setLockManagerSettings(LockManagerSettings lockManagerSettings) {
     this.lockManagerSettings = lockManagerSettings;
   }
+
+  public boolean isBulkTransfer() {
+    return bulkTransfer;
+  }
+
+  public void setBulkTransfer(boolean bulkTransfer) {
+    this.bulkTransfer = bulkTransfer;
+  }
+
+  public int getBulkTransferSize() {
+    return bulkTransferSize;
+  }
+
+  public void setBulkTransferSize(int bulkTransferSize) {
+    this.bulkTransferSize = bulkTransferSize;
+  }
 }
diff --git a/gossip-base/src/main/java/org/apache/gossip/StartupSettings.java b/gossip-base/src/main/java/org/apache/gossip/StartupSettings.java
index dd30e88..23608f2 100644
--- a/gossip-base/src/main/java/org/apache/gossip/StartupSettings.java
+++ b/gossip-base/src/main/java/org/apache/gossip/StartupSettings.java
@@ -55,6 +55,10 @@
   /** The list with gossip members to start with. */
   private final List<Member> gossipMembers;
 
+  /** Default setting values */
+  private static final boolean DEFAULT_BULK_TRANSFER = false;
+  public static final int DEFAULT_BULK_TRANSFER_SIZE = 100;
+
   /**
    * Constructor.
    * 
@@ -175,6 +179,7 @@
       properties.put(i.getKey(), i.getValue().asText());
     }
     //TODO constants as defaults?
+    // TODO setting keys as constants?
     int gossipInterval = jsonObject.get("gossip_interval").intValue();
     int cleanupInterval = jsonObject.get("cleanup_interval").intValue();
     int windowSize = jsonObject.get("window_size").intValue();
@@ -182,6 +187,12 @@
     double convictThreshold = jsonObject.get("convict_threshold").asDouble();
     String cluster = jsonObject.get("cluster").textValue();
     String distribution = jsonObject.get("distribution").textValue();
+    boolean bulkTransfer = jsonObject.has("bulk_transfer") ?
+            jsonObject.get("bulk_transfer").booleanValue() :
+            DEFAULT_BULK_TRANSFER;
+    int bulkTransferSize = jsonObject.has("bulk_transfer_size") ?
+            jsonObject.get("bulk_transfer_size").intValue() :
+            DEFAULT_BULK_TRANSFER_SIZE;
     if (cluster == null){
       throw new IllegalArgumentException("cluster was null. It is required");
     }
@@ -192,8 +203,9 @@
         jsonObject.get("protocol_manager_class").textValue() : 
         null;
     URI uri2 = new URI(uri);
-    GossipSettings gossipSettings = new GossipSettings(gossipInterval, cleanupInterval, windowSize, minSamples,
-        convictThreshold, distribution);
+    GossipSettings gossipSettings = new GossipSettings(gossipInterval, cleanupInterval, windowSize,
+            minSamples, convictThreshold, distribution, bulkTransfer);
+    gossipSettings.setBulkTransferSize(bulkTransferSize);
     if (transportClass != null) {
       gossipSettings.setTransportManagerClass(transportClass);
     }
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java b/gossip-base/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java
index adf2530..4bd44f2 100644
--- a/gossip-base/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java
@@ -24,6 +24,7 @@
 import java.util.concurrent.ConcurrentHashMap;
 import com.codahale.metrics.Histogram;
 import com.codahale.metrics.MetricRegistry;
+import org.apache.gossip.GossipSettings;
 import org.apache.gossip.LocalMember;
 import org.apache.gossip.model.ActiveGossipOk;
 import org.apache.gossip.model.PerNodeDataMessage;
@@ -31,40 +32,40 @@
 import org.apache.gossip.model.Response;
 import org.apache.gossip.model.SharedDataMessage;
 import org.apache.gossip.model.ShutdownMessage;
-import org.apache.gossip.udp.UdpActiveGossipMessage;
-import org.apache.gossip.udp.UdpPerNodeDataMessage;
-import org.apache.gossip.udp.UdpSharedDataMessage;
+import org.apache.gossip.udp.*;
 import org.apache.log4j.Logger;
 
 import static com.codahale.metrics.MetricRegistry.name;
 
 /**
- * The ActiveGossipThread is sends information. Pick a random partner and send the membership list to that partner
+ * The ActiveGossipThread sends information. Pick a random partner and send the membership list to that partner
  */
 public abstract class AbstractActiveGossiper {
 
   protected static final Logger LOGGER = Logger.getLogger(AbstractActiveGossiper.class);
-  
+
   protected final GossipManager gossipManager;
   protected final GossipCore gossipCore;
   private final Histogram sharedDataHistogram;
   private final Histogram sendPerNodeDataHistogram;
-  private final Histogram sendMembershipHistorgram;
+  private final Histogram sendMembershipHistogram;
   private final Random random;
+  private final GossipSettings gossipSettings;
 
   public AbstractActiveGossiper(GossipManager gossipManager, GossipCore gossipCore, MetricRegistry registry) {
     this.gossipManager = gossipManager;
     this.gossipCore = gossipCore;
     sharedDataHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sharedDataHistogram-time"));
     sendPerNodeDataHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sendPerNodeDataHistogram-time"));
-    sendMembershipHistorgram = registry.histogram(name(AbstractActiveGossiper.class, "sendMembershipHistorgram-time"));
+    sendMembershipHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sendMembershipHistogram-time"));
     random = new Random();
+    gossipSettings = gossipManager.getSettings();
   }
 
   public void init() {
 
   }
-  
+
   public void shutdown() {
 
   }
@@ -78,12 +79,22 @@
     m.setShutdownAtNanos(gossipManager.getClock().nanoTime());
     gossipCore.sendOneWay(m, target.getUri());
   }
-  
-  public final void sendSharedData(LocalMember me, LocalMember member){
-    if (member == null){
+
+  public final void sendSharedData(LocalMember me, LocalMember member) {
+    if (member == null) {
       return;
     }
     long startTime = System.currentTimeMillis();
+    if (gossipSettings.isBulkTransfer()) {
+      sendSharedDataInBulkInternal(me, member);
+    } else {
+      sendSharedDataInternal(me, member);
+    }
+    sharedDataHistogram.update(System.currentTimeMillis() - startTime);
+  }
+
+  /** Send shared data one entry at a time. */
+  private void sendSharedDataInternal(LocalMember me, LocalMember member) {
     for (Entry<String, SharedDataMessage> innerEntry : gossipCore.getSharedData().entrySet()){
       if (innerEntry.getValue().getReplicable() != null && !innerEntry.getValue().getReplicable()
               .shouldReplicate(me, member, innerEntry.getValue())) {
@@ -92,22 +103,60 @@
       UdpSharedDataMessage message = new UdpSharedDataMessage();
       message.setUuid(UUID.randomUUID().toString());
       message.setUriFrom(me.getId());
-      message.setExpireAt(innerEntry.getValue().getExpireAt());
-      message.setKey(innerEntry.getValue().getKey());
-      message.setNodeId(innerEntry.getValue().getNodeId());
-      message.setTimestamp(innerEntry.getValue().getTimestamp());
-      message.setPayload(innerEntry.getValue().getPayload());
-      message.setReplicable(innerEntry.getValue().getReplicable());
+      copySharedDataMessage(innerEntry.getValue(), message);
       gossipCore.sendOneWay(message, member.getUri());
     }
-    sharedDataHistogram.update(System.currentTimeMillis() - startTime);
   }
-  
+
+  /** Send shared data by batching together several entries. */
+  private void sendSharedDataInBulkInternal(LocalMember me, LocalMember member) {
+    UdpSharedDataBulkMessage udpMessage = new UdpSharedDataBulkMessage();
+    udpMessage.setUuid(UUID.randomUUID().toString());
+    udpMessage.setUriFrom(me.getId());
+    for (Entry<String, SharedDataMessage> innerEntry : gossipCore.getSharedData().entrySet()) {
+      if (innerEntry.getValue().getReplicable() != null && !innerEntry.getValue().getReplicable()
+              .shouldReplicate(me, member, innerEntry.getValue())) {
+        continue;
+      }
+      SharedDataMessage message = new SharedDataMessage();
+      copySharedDataMessage(innerEntry.getValue(), message);
+      udpMessage.addMessage(message);
+      if (udpMessage.getMessages().size() == gossipSettings.getBulkTransferSize()) {
+        gossipCore.sendOneWay(udpMessage, member.getUri());
+        udpMessage = new UdpSharedDataBulkMessage();
+        udpMessage.setUuid(UUID.randomUUID().toString());
+        udpMessage.setUriFrom(me.getId());
+      }
+    }
+    if (udpMessage.getMessages().size() > 0) {
+      gossipCore.sendOneWay(udpMessage, member.getUri());
+    }
+  }
+
+  private void copySharedDataMessage(SharedDataMessage original, SharedDataMessage copy) {
+    copy.setExpireAt(original.getExpireAt());
+    copy.setKey(original.getKey());
+    copy.setNodeId(original.getNodeId());
+    copy.setTimestamp(original.getTimestamp());
+    copy.setPayload(original.getPayload());
+    copy.setReplicable(original.getReplicable());
+  }
+
   public final void sendPerNodeData(LocalMember me, LocalMember member){
     if (member == null){
       return;
     }
     long startTime = System.currentTimeMillis();
+    if (gossipSettings.isBulkTransfer()) {
+      sendPerNodeDataInBulkInternal(me, member);
+    } else {
+      sendPerNodeDataInternal(me, member);
+    }
+    sendPerNodeDataHistogram.update(System.currentTimeMillis() - startTime);
+  }
+
+  /** Send per node data one entry at a time. */
+  private void sendPerNodeDataInternal(LocalMember me, LocalMember member) {
     for (Entry<String, ConcurrentHashMap<String, PerNodeDataMessage>> entry : gossipCore.getPerNodeData().entrySet()){
       for (Entry<String, PerNodeDataMessage> innerEntry : entry.getValue().entrySet()){
         if (innerEntry.getValue().getReplicable() != null && !innerEntry.getValue().getReplicable()
@@ -117,18 +166,49 @@
         UdpPerNodeDataMessage message = new UdpPerNodeDataMessage();
         message.setUuid(UUID.randomUUID().toString());
         message.setUriFrom(me.getId());
-        message.setExpireAt(innerEntry.getValue().getExpireAt());
-        message.setKey(innerEntry.getValue().getKey());
-        message.setNodeId(innerEntry.getValue().getNodeId());
-        message.setTimestamp(innerEntry.getValue().getTimestamp());
-        message.setPayload(innerEntry.getValue().getPayload());
-        message.setReplicable(innerEntry.getValue().getReplicable());
-        gossipCore.sendOneWay(message, member.getUri());   
+        copyPerNodeDataMessage(innerEntry.getValue(), message);
+        gossipCore.sendOneWay(message, member.getUri());
       }
     }
-    sendPerNodeDataHistogram.update(System.currentTimeMillis() - startTime);
+
   }
-    
+
+  /** Send per node data by batching together several entries. */
+  private void sendPerNodeDataInBulkInternal(LocalMember me, LocalMember member) {
+    for (Entry<String, ConcurrentHashMap<String, PerNodeDataMessage>> entry : gossipCore.getPerNodeData().entrySet()){
+      UdpPerNodeDataBulkMessage udpMessage = new UdpPerNodeDataBulkMessage();
+      udpMessage.setUuid(UUID.randomUUID().toString());
+      udpMessage.setUriFrom(me.getId());
+      for (Entry<String, PerNodeDataMessage> innerEntry : entry.getValue().entrySet()){
+        if (innerEntry.getValue().getReplicable() != null && !innerEntry.getValue().getReplicable()
+                .shouldReplicate(me, member, innerEntry.getValue())) {
+          continue;
+        }
+        PerNodeDataMessage message = new PerNodeDataMessage();
+        copyPerNodeDataMessage(innerEntry.getValue(), message);
+        udpMessage.addMessage(message);
+        if (udpMessage.getMessages().size() == gossipSettings.getBulkTransferSize()) {
+          gossipCore.sendOneWay(udpMessage, member.getUri());
+          udpMessage = new UdpPerNodeDataBulkMessage();
+          udpMessage.setUuid(UUID.randomUUID().toString());
+          udpMessage.setUriFrom(me.getId());
+        }
+      }
+      if (udpMessage.getMessages().size() > 0) {
+        gossipCore.sendOneWay(udpMessage, member.getUri());
+      }
+    }
+  }
+
+  private void copyPerNodeDataMessage(PerNodeDataMessage original, PerNodeDataMessage copy) {
+    copy.setExpireAt(original.getExpireAt());
+    copy.setKey(original.getKey());
+    copy.setNodeId(original.getNodeId());
+    copy.setTimestamp(original.getTimestamp());
+    copy.setPayload(original.getPayload());
+    copy.setReplicable(original.getReplicable());
+  }
+
   /**
    * Performs the sending of the membership list, after we have incremented our own heartbeat.
    */
@@ -151,9 +231,9 @@
     } else {
       LOGGER.debug("Message " + message + " generated response " + r);
     }
-    sendMembershipHistorgram.update(System.currentTimeMillis() - startTime);
+    sendMembershipHistogram.update(System.currentTimeMillis() - startTime);
   }
-    
+
   protected final Member convert(LocalMember member){
     Member gm = new Member();
     gm.setCluster(member.getClusterName());
@@ -163,9 +243,9 @@
     gm.setProperties(member.getProperties());
     return gm;
   }
-  
+
   /**
-   * 
+   *
    * @param memberList
    *          An immutable list
    * @return The chosen LocalGossipMember to gossip with.
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageHandlerFactory.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageHandlerFactory.java
index fff9430..abce76d 100644
--- a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageHandlerFactory.java
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageHandlerFactory.java
@@ -20,37 +20,36 @@
 
 import org.apache.gossip.manager.GossipCore;
 import org.apache.gossip.manager.GossipManager;
-import org.apache.gossip.model.ActiveGossipMessage;
-import org.apache.gossip.model.Base;
-import org.apache.gossip.model.PerNodeDataMessage;
-import org.apache.gossip.model.Response;
-import org.apache.gossip.model.SharedDataMessage;
-import org.apache.gossip.model.ShutdownMessage;
+import org.apache.gossip.model.*;
 
 import java.util.Arrays;
 
 public class MessageHandlerFactory {
-  
+
   public static MessageHandler defaultHandler() {
     return concurrentHandler(
         new TypedMessageHandler(Response.class, new ResponseHandler()),
         new TypedMessageHandler(ShutdownMessage.class, new ShutdownMessageHandler()),
         new TypedMessageHandler(PerNodeDataMessage.class, new PerNodeDataMessageHandler()),
         new TypedMessageHandler(SharedDataMessage.class, new SharedDataMessageHandler()),
-        new TypedMessageHandler(ActiveGossipMessage.class, new ActiveGossipMessageHandler())
+        new TypedMessageHandler(ActiveGossipMessage.class, new ActiveGossipMessageHandler()),
+        new TypedMessageHandler(PerNodeDataBulkMessage.class, new PerNodeDataBulkMessageHandler()),
+        new TypedMessageHandler(SharedDataBulkMessage.class, new SharedDataBulkMessageHandler())
     );
   }
-  
+
   public static MessageHandler concurrentHandler(MessageHandler... handlers) {
-    if (handlers == null) throw new NullPointerException("handlers cannot be null");
+    if (handlers == null)
+      throw new NullPointerException("handlers cannot be null");
     if (Arrays.asList(handlers).stream().filter(i -> i != null).count() != handlers.length) {
       throw new NullPointerException("found at least one null handler");
     }
     return new MessageHandler() {
-      @Override
-      public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
+      @Override public boolean invoke(GossipCore gossipCore, GossipManager gossipManager,
+              Base base) {
         // return true if at least one of the component handlers return true.
-        return Arrays.asList(handlers).stream().filter((mi) -> mi.invoke(gossipCore, gossipManager, base)).count() > 0;
+        return Arrays.asList(handlers).stream()
+                .filter((mi) -> mi.invoke(gossipCore, gossipManager, base)).count() > 0;
       }
     };
   }
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/PerNodeDataBulkMessageHandler.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/PerNodeDataBulkMessageHandler.java
new file mode 100644
index 0000000..37024e9
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/PerNodeDataBulkMessageHandler.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager.handlers;
+
+import org.apache.gossip.manager.GossipCore;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.model.Base;
+import org.apache.gossip.model.PerNodeDataMessage;
+import org.apache.gossip.udp.UdpPerNodeDataBulkMessage;
+
+public class PerNodeDataBulkMessageHandler implements MessageHandler {
+  
+  /**
+   * @param gossipCore context.
+   * @param gossipManager context.
+   * @param base message reference.
+   * @return boolean indicating success.
+   */
+  @Override
+  public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
+    UdpPerNodeDataBulkMessage udpMessage = (UdpPerNodeDataBulkMessage) base;
+    for (PerNodeDataMessage dataMsg: udpMessage.getMessages())
+      gossipCore.addPerNodeData(dataMsg);
+    return true;
+  }
+}
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/PerNodeDataMessageHandler.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/PerNodeDataMessageHandler.java
index 0ad0d91..4ac47b9 100644
--- a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/PerNodeDataMessageHandler.java
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/PerNodeDataMessageHandler.java
@@ -23,7 +23,7 @@
 import org.apache.gossip.udp.UdpPerNodeDataMessage;
 
 public class PerNodeDataMessageHandler implements MessageHandler {
-  
+
   /**
    * @param gossipCore context.
    * @param gossipManager context.
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/SharedDataBulkMessageHandler.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/SharedDataBulkMessageHandler.java
new file mode 100644
index 0000000..a062f95
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/SharedDataBulkMessageHandler.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager.handlers;
+
+import org.apache.gossip.manager.GossipCore;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.model.Base;
+import org.apache.gossip.model.SharedDataMessage;
+import org.apache.gossip.udp.UdpSharedDataBulkMessage;
+
+public class SharedDataBulkMessageHandler implements MessageHandler{
+  
+  /**
+   * @param gossipCore context.
+   * @param gossipManager context.
+   * @param base message reference.
+   * @return boolean indicating success.
+   */
+  @Override
+  public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
+    UdpSharedDataBulkMessage udpMessage = (UdpSharedDataBulkMessage) base;
+    for (SharedDataMessage dataMsg: udpMessage.getMessages())
+      gossipCore.addSharedData(dataMsg);
+    return true;
+  }
+}
diff --git a/gossip-base/src/main/java/org/apache/gossip/model/Base.java b/gossip-base/src/main/java/org/apache/gossip/model/Base.java
index 1b66310..e9183b0 100644
--- a/gossip-base/src/main/java/org/apache/gossip/model/Base.java
+++ b/gossip-base/src/main/java/org/apache/gossip/model/Base.java
@@ -19,9 +19,9 @@
 
 import org.apache.gossip.udp.UdpActiveGossipMessage;
 import org.apache.gossip.udp.UdpActiveGossipOk;
-import org.apache.gossip.udp.UdpPerNodeDataMessage;
+import org.apache.gossip.udp.UdpPerNodeDataBulkMessage;
 import org.apache.gossip.udp.UdpNotAMemberFault;
-import org.apache.gossip.udp.UdpSharedDataMessage;
+import org.apache.gossip.udp.UdpSharedDataBulkMessage;
 
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
 import com.fasterxml.jackson.annotation.JsonSubTypes;
@@ -40,9 +40,9 @@
         @Type(value = UdpActiveGossipMessage.class, name = "UdpActiveGossipMessage"),
         @Type(value = UdpNotAMemberFault.class, name = "UdpNotAMemberFault"),
         @Type(value = PerNodeDataMessage.class, name = "PerNodeDataMessage"),
-        @Type(value = UdpPerNodeDataMessage.class, name = "UdpPerNodeDataMessage"),
+        @Type(value = UdpPerNodeDataBulkMessage.class, name = "UdpPerNodeDataMessage"),
         @Type(value = SharedDataMessage.class, name = "SharedDataMessage"),
-        @Type(value = UdpSharedDataMessage.class, name = "UdpSharedDataMessage")
+        @Type(value = UdpSharedDataBulkMessage.class, name = "UdpSharedDataMessage")
         })
 public class Base {
 
diff --git a/gossip-base/src/main/java/org/apache/gossip/model/PerNodeDataBulkMessage.java b/gossip-base/src/main/java/org/apache/gossip/model/PerNodeDataBulkMessage.java
new file mode 100644
index 0000000..bb138a5
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/model/PerNodeDataBulkMessage.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.model;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class PerNodeDataBulkMessage extends Base {
+  private List<PerNodeDataMessage> messages = new ArrayList<>();
+
+  public void addMessage(PerNodeDataMessage msg) {
+    messages.add(msg);
+  }
+
+  public List<PerNodeDataMessage> getMessages() {
+    return messages;
+  }
+
+  @Override public String toString() {
+    return "GossipDataBulkMessage[" + messages.stream().map(Object::toString)
+            .collect(Collectors.joining(",")) + "]";
+  }
+}
diff --git a/gossip-base/src/main/java/org/apache/gossip/model/SharedDataBulkMessage.java b/gossip-base/src/main/java/org/apache/gossip/model/SharedDataBulkMessage.java
new file mode 100644
index 0000000..7b67430
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/model/SharedDataBulkMessage.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.model;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class SharedDataBulkMessage extends Base {
+  private List<SharedDataMessage> messages = new ArrayList<>();
+
+  public void addMessage(SharedDataMessage msg) {
+    messages.add(msg);
+  }
+
+  public List<SharedDataMessage> getMessages() {
+    return messages;
+  }
+
+  @Override public String toString() {
+    return "SharedGossipDataBulkMessage[" + messages.stream().map(Object::toString)
+            .collect(Collectors.joining(",")) + "]";
+  }
+}
diff --git a/gossip-base/src/main/java/org/apache/gossip/model/SharedDataMessage.java b/gossip-base/src/main/java/org/apache/gossip/model/SharedDataMessage.java
index 4b1a1ea..e148189 100644
--- a/gossip-base/src/main/java/org/apache/gossip/model/SharedDataMessage.java
+++ b/gossip-base/src/main/java/org/apache/gossip/model/SharedDataMessage.java
@@ -28,7 +28,7 @@
   private Long timestamp;
   private Long expireAt;
   private Replicable<SharedDataMessage> replicable;
-  
+
   public String getNodeId() {
     return nodeId;
   }
diff --git a/gossip-base/src/main/java/org/apache/gossip/udp/UdpPerNodeDataBulkMessage.java b/gossip-base/src/main/java/org/apache/gossip/udp/UdpPerNodeDataBulkMessage.java
new file mode 100644
index 0000000..99eb1e5
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/udp/UdpPerNodeDataBulkMessage.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.udp;
+
+import org.apache.gossip.model.PerNodeDataBulkMessage;
+
+public class UdpPerNodeDataBulkMessage extends PerNodeDataBulkMessage implements Trackable {
+
+  private String uriFrom;
+  private String uuid;
+  
+  public String getUriFrom() {
+    return uriFrom;
+  }
+  
+  public void setUriFrom(String uriFrom) {
+    this.uriFrom = uriFrom;
+  }
+  
+  public String getUuid() {
+    return uuid;
+  }
+  
+  public void setUuid(String uuid) {
+    this.uuid = uuid;
+  }
+
+  @Override
+  public String toString() {
+    return "UdpGossipDataMessage [uriFrom=" + uriFrom + ", uuid=" + uuid
+            + ", messages=[" + super.toString() + "] ]";
+  }
+
+}
diff --git a/gossip-base/src/main/java/org/apache/gossip/udp/UdpSharedDataBulkMessage.java b/gossip-base/src/main/java/org/apache/gossip/udp/UdpSharedDataBulkMessage.java
new file mode 100644
index 0000000..8dc8be1
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/udp/UdpSharedDataBulkMessage.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.udp;
+
+import org.apache.gossip.model.SharedDataBulkMessage;
+
+public class UdpSharedDataBulkMessage extends SharedDataBulkMessage implements Trackable {
+
+  private String uriFrom;
+  private String uuid;
+  
+  public String getUriFrom() {
+    return uriFrom;
+  }
+  
+  public void setUriFrom(String uriFrom) {
+    this.uriFrom = uriFrom;
+  }
+  
+  public String getUuid() {
+    return uuid;
+  }
+  
+  public void setUuid(String uuid) {
+    this.uuid = uuid;
+  }
+
+  @Override
+  public String toString() {
+    return "UdpSharedGossipDataMessage [uriFrom=" + uriFrom + ", uuid=" + uuid + ", getNodeId()="
+            + ", messages=[" + super.toString() + "] ]";
+  }
+
+}
diff --git a/gossip-itest/src/test/java/org/apache/gossip/PerNodeDataEventTest.java b/gossip-itest/src/test/java/org/apache/gossip/PerNodeDataEventTest.java
index 59136d1..cc269f9 100644
--- a/gossip-itest/src/test/java/org/apache/gossip/PerNodeDataEventTest.java
+++ b/gossip-itest/src/test/java/org/apache/gossip/PerNodeDataEventTest.java
@@ -23,19 +23,17 @@
 import org.apache.gossip.model.PerNodeDataMessage;
 import org.junit.Assert;
 import org.junit.Test;
-import org.junit.platform.runner.JUnitPlatform;
 import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
+import java.util.*;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
-@RunWith(JUnitPlatform.class)
+@RunWith(Parameterized.class)
 public class PerNodeDataEventTest extends AbstractIntegrationBase {
   
   private String receivedKey = "";
@@ -43,25 +41,39 @@
   private Object receivingNodeDataNewValue = "";
   private Object receivingNodeDataOldValue = "";
   private Semaphore lock = new Semaphore(0);
-  
-  
+  private int base;
+  private boolean bulkTransfer;
+
+  public PerNodeDataEventTest(int base, boolean bulkTransfer) {
+    this.base = base;
+    this.bulkTransfer = bulkTransfer;
+  }
+
+  @Parameterized.Parameters(name = "{index} bulkTransfer={1}")
+  public static Collection<Object[]> data() {
+    return Arrays.asList(new Object[][]{
+            {50000, false}, {55000, true}
+    });
+  }
+
   @Test
   public void perNodeDataEventTest()
           throws InterruptedException, UnknownHostException, URISyntaxException {
     GossipSettings settings = new GossipSettings();
     settings.setPersistRingState(false);
     settings.setPersistDataState(false);
+    settings.setBulkTransfer(bulkTransfer);
     String cluster = UUID.randomUUID().toString();
     int seedNodes = 1;
     List<Member> startupMembers = new ArrayList<>();
     for (int i = 1; i < seedNodes + 1; ++i) {
-      URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+      URI uri = new URI("udp://" + "127.0.0.1" + ":" + (base + i));
       startupMembers.add(new RemoteMember(cluster, uri, i + ""));
     }
     final List<GossipManager> clients = new ArrayList<>();
     final int clusterMembers = 2;
     for (int i = 1; i < clusterMembers + 1; ++i) {
-      URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+      URI uri = new URI("udp://" + "127.0.0.1" + ":" + (base + i));
       GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri)
               .id(i + "").gossipMembers(startupMembers).gossipSettings(settings).build();
       clients.add(gossipService);
diff --git a/gossip-itest/src/test/java/org/apache/gossip/SharedDataEventTest.java b/gossip-itest/src/test/java/org/apache/gossip/SharedDataEventTest.java
index 8dbfcb3..547cd45 100644
--- a/gossip-itest/src/test/java/org/apache/gossip/SharedDataEventTest.java
+++ b/gossip-itest/src/test/java/org/apache/gossip/SharedDataEventTest.java
@@ -24,19 +24,17 @@
 import org.apache.gossip.model.SharedDataMessage;
 import org.junit.Assert;
 import org.junit.Test;
-import org.junit.platform.runner.JUnitPlatform;
 import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
+import java.util.*;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
-@RunWith(JUnitPlatform.class)
+@RunWith(Parameterized.class)
 public class SharedDataEventTest extends AbstractIntegrationBase {
   
   private String receivedKey = "";
@@ -44,6 +42,20 @@
   private Object receivingNodeDataOldValue = "";
   private String gCounterKey = "gCounter";
   private Semaphore lock = new Semaphore(0);
+  private int base;
+  private boolean bulkTransfer;
+
+  public SharedDataEventTest(int base, boolean bulkTransfer) {
+    this.base = base;
+    this.bulkTransfer = bulkTransfer;
+  }
+
+  @Parameterized.Parameters(name = "{index} bulkTransfer={1}")
+  public static Collection<Object[]> data() {
+    return Arrays.asList(new Object[][]{
+            {50000, false}, {55000, true}
+    });
+  }
   
   @Test
   public void sharedDataEventTest()
@@ -51,17 +63,18 @@
     GossipSettings settings = new GossipSettings();
     settings.setPersistRingState(false);
     settings.setPersistDataState(false);
+    settings.setBulkTransfer(bulkTransfer);
     String cluster = UUID.randomUUID().toString();
     int seedNodes = 1;
     List<Member> startupMembers = new ArrayList<>();
     for (int i = 1; i < seedNodes + 1; ++i) {
-      URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+      URI uri = new URI("udp://" + "127.0.0.1" + ":" + (base + i));
       startupMembers.add(new RemoteMember(cluster, uri, i + ""));
     }
     final List<GossipManager> clients = new ArrayList<>();
     final int clusterMembers = 2;
     for (int i = 1; i < clusterMembers + 1; ++i) {
-      URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+      URI uri = new URI("udp://" + "127.0.0.1" + ":" + (base + i));
       GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri)
               .id(i + "").gossipMembers(startupMembers).gossipSettings(settings).build();
       clients.add(gossipService);
@@ -126,13 +139,13 @@
     int seedNodes = 1;
     List<Member> startupMembers = new ArrayList<>();
     for (int i = 1; i < seedNodes + 1; ++i) {
-      URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+      URI uri = new URI("udp://" + "127.0.0.1" + ":" + (base + i));
       startupMembers.add(new RemoteMember(cluster, uri, i + ""));
     }
     final List<GossipManager> clients = new ArrayList<>();
     final int clusterMembers = 3;
     for (int i = 1; i < clusterMembers + 1; ++i) {
-      URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+      URI uri = new URI("udp://" + "127.0.0.1" + ":" + (base + i));
       GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri)
               .id(i + "").gossipMembers(startupMembers).gossipSettings(settings).build();
       clients.add(gossipService);
diff --git a/gossip-itest/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java b/gossip-itest/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
index dd5bfe9..e5c3639 100644
--- a/gossip-itest/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
+++ b/gossip-itest/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
@@ -49,7 +49,7 @@
   @Test
   public void DeadNodesDoNotComeAliveAgain()
           throws InterruptedException, UnknownHostException, URISyntaxException {
-    GossipSettings settings = new GossipSettings(100, 10000, 1000, 1, 10.0, "normal");
+    GossipSettings settings = new GossipSettings(100, 10000, 1000, 1, 10.0, "normal", false);
     settings.setPersistRingState(false);
     settings.setPersistDataState(false);
     String cluster = UUID.randomUUID().toString();
diff --git a/gossip-itest/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java b/gossip-itest/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
index 8ae783e..eb9abcd 100644
--- a/gossip-itest/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
+++ b/gossip-itest/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
@@ -47,7 +47,7 @@
   }
 
   public void abc(int base) throws InterruptedException, UnknownHostException, URISyntaxException {
-    GossipSettings settings = new GossipSettings(1000, 10000, 1000, 1, 1.6, "exponential");
+    GossipSettings settings = new GossipSettings(1000, 10000, 1000, 1, 1.6, "exponential", false);
     settings.setPersistRingState(false);
     settings.setPersistDataState(false);
     String cluster = UUID.randomUUID().toString();