GOSSIP-56 GossipCore should allow registration of handlers

MessageInvoker idea. Returns true when it managed to invoke one of
handlers. User can build any structure of handlers.
See tests: MessageInvokerTest.
diff --git a/src/main/java/org/apache/gossip/manager/GossipCore.java b/src/main/java/org/apache/gossip/manager/GossipCore.java
index 6f97a74..403acf4 100644
--- a/src/main/java/org/apache/gossip/manager/GossipCore.java
+++ b/src/main/java/org/apache/gossip/manager/GossipCore.java
@@ -17,6 +17,17 @@
  */
 package org.apache.gossip.manager;
 
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.gossip.GossipMember;
+import org.apache.gossip.LocalGossipMember;
+import org.apache.gossip.RemoteGossipMember;
+import org.apache.gossip.event.GossipState;
+import org.apache.gossip.model.*;
+import org.apache.gossip.udp.Trackable;
+import org.apache.log4j.Logger;
+
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
@@ -24,53 +35,15 @@
 import java.net.DatagramSocket;
 import java.net.InetAddress;
 import java.net.URI;
-import java.net.URISyntaxException;
-import java.security.InvalidKeyException;
-import java.security.KeyFactory;
-import java.security.NoSuchAlgorithmException;
-import java.security.NoSuchProviderException;
-import java.security.PrivateKey;
-import java.security.Signature;
-import java.security.SignatureException;
+import java.security.*;
 import java.security.spec.InvalidKeySpecException;
 import java.security.spec.PKCS8EncodedKeySpec;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map.Entry;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import org.apache.gossip.GossipMember;
-import org.apache.gossip.LocalGossipMember;
-import org.apache.gossip.RemoteGossipMember;
-import org.apache.gossip.event.GossipState;
-import org.apache.gossip.model.ActiveGossipMessage;
-import org.apache.gossip.model.Base;
-import org.apache.gossip.model.GossipDataMessage;
-import org.apache.gossip.model.Response;
-import org.apache.gossip.model.SharedGossipDataMessage;
-import org.apache.gossip.model.ShutdownMessage;
-import org.apache.gossip.model.SignedPayload;
-import org.apache.gossip.udp.Trackable;
-import org.apache.gossip.udp.UdpActiveGossipMessage;
-import org.apache.gossip.udp.UdpActiveGossipOk;
-import org.apache.gossip.udp.UdpGossipDataMessage;
-import org.apache.gossip.udp.UdpNotAMemberFault;
-import org.apache.gossip.udp.UdpSharedGossipDataMessage;
-import org.apache.log4j.Logger;
-
-import com.codahale.metrics.Gauge;
-import com.codahale.metrics.Meter;
-import com.codahale.metrics.MetricRegistry;
+import java.util.concurrent.*;
 
 public class GossipCore implements GossipCoreConstants {
-  
+
   public static final Logger LOGGER = Logger.getLogger(GossipCore.class);
   private final GossipManager gossipManager;
   private ConcurrentHashMap<String, Base> requests;
@@ -83,7 +56,7 @@
   private final Meter messageSerdeException;
   private final Meter tranmissionException;
   private final Meter tranmissionSuccess;
-    
+
   public GossipCore(GossipManager manager, MetricRegistry metrics){
     this.gossipManager = manager;
     requests = new ConcurrentHashMap<>();
@@ -100,6 +73,7 @@
     messageSerdeException = metrics.meter(MESSAGE_SERDE_EXCEPTION);
     tranmissionException = metrics.meter(MESSAGE_TRANSMISSION_EXCEPTION);
     tranmissionSuccess = metrics.meter(MESSAGE_TRANSMISSION_SUCCESS);
+
     if (manager.getSettings().isSignMessages()){
       File privateKey = new File(manager.getSettings().getPathToKeyStore(), manager.getMyself().getId());
       File publicKey = new File(manager.getSettings().getPathToKeyStore(), manager.getMyself().getId() + ".pub");
@@ -124,7 +98,7 @@
       privKey = null;
     }
   }
-  
+
   private byte [] sign(byte [] bytes){
     Signature dsa;
     try {
@@ -136,7 +110,7 @@
       throw new RuntimeException(e);
     } 
   }
-  
+
   public void addSharedData(SharedGossipDataMessage message){
      SharedGossipDataMessage previous = sharedData.get(message.getKey());
      if (previous == null){
@@ -163,11 +137,11 @@
       }
     }
   }
-  
+
   public ConcurrentHashMap<String, ConcurrentHashMap<String, GossipDataMessage>> getPerNodeData(){
     return perNodeData;
   }
-  
+
   public ConcurrentHashMap<String, SharedGossipDataMessage> getSharedData() {
     return sharedData;
   }
@@ -181,74 +155,15 @@
     }
     service.shutdownNow();
   }
-  
-  public void receive(Base base){
-    if (base instanceof Response){
-      if (base instanceof Trackable){
-        Trackable t = (Trackable) base;
-        requests.put(t.getUuid() + "/" + t.getUriFrom(), (Base) t);
-      }
-    }
-    if (base instanceof ShutdownMessage){
-      ShutdownMessage s = (ShutdownMessage) base;
-      GossipDataMessage m = new GossipDataMessage();
-      m.setKey(ShutdownMessage.PER_NODE_KEY);
-      m.setNodeId(s.getNodeId());
-      m.setPayload(base);
-      m.setTimestamp(System.currentTimeMillis());
-      m.setExpireAt(System.currentTimeMillis() + 30L * 1000L);
-      addPerNodeData(m);
-    }
-    if (base instanceof GossipDataMessage) {
-      UdpGossipDataMessage message = (UdpGossipDataMessage) base;
-      addPerNodeData(message);
-    }
-    if (base instanceof SharedGossipDataMessage){
-      UdpSharedGossipDataMessage message = (UdpSharedGossipDataMessage) base;
-      addSharedData(message);
-    }
-    if (base instanceof ActiveGossipMessage){
-      List<GossipMember> remoteGossipMembers = new ArrayList<>();
-      RemoteGossipMember senderMember = null;
-      UdpActiveGossipMessage activeGossipMessage = (UdpActiveGossipMessage) base;
-      for (int i = 0; i < activeGossipMessage.getMembers().size(); i++) {
-        URI u = null;
-        try {
-          u = new URI(activeGossipMessage.getMembers().get(i).getUri());
-        } catch (URISyntaxException e) {
-          LOGGER.debug("Gossip message with faulty URI", e);
-          continue;
-        }
-        RemoteGossipMember member = new RemoteGossipMember(
-                activeGossipMessage.getMembers().get(i).getCluster(),
-                u,
-                activeGossipMessage.getMembers().get(i).getId(),
-                activeGossipMessage.getMembers().get(i).getHeartbeat(),
-                activeGossipMessage.getMembers().get(i).getProperties());
-        if (i == 0) {
-          senderMember = member;
-        } 
-        if (!(member.getClusterName().equals(gossipManager.getMyself().getClusterName()))){
-          UdpNotAMemberFault f = new UdpNotAMemberFault();
-          f.setException("Not a member of this cluster " + i);
-          f.setUriFrom(activeGossipMessage.getUriFrom());
-          f.setUuid(activeGossipMessage.getUuid());
-          LOGGER.warn(f);
-          sendOneWay(f, member.getUri());
-          continue;
-        }
-        remoteGossipMembers.add(member);
-      }
-      UdpActiveGossipOk o = new UdpActiveGossipOk();
-      o.setUriFrom(activeGossipMessage.getUriFrom());
-      o.setUuid(activeGossipMessage.getUuid());
-      sendOneWay(o, senderMember.getUri());
-      mergeLists(gossipManager, senderMember, remoteGossipMembers);
+
+  public void receive(Base base) {
+    if (!gossipManager.getMessageInvoker().invoke(this, gossipManager, base)) {
+      LOGGER.warn("received message can not be handled");
     }
   }
-  
+
   /**
-   * Sends a blocking message.  
+   * Sends a blocking message.
    * @param message
    * @param uri
    * @throws RuntimeException if data can not be serialized or in transmission error
@@ -277,15 +192,15 @@
     } catch (IOException e) {
       tranmissionException.mark();
       throw new RuntimeException(e);
-    } 
+    }
   }
-  
+
   public Response send(Base message, URI uri){
     if (LOGGER.isDebugEnabled()){
-      LOGGER.debug("Sending " + message);  
+      LOGGER.debug("Sending " + message);
       LOGGER.debug("Current request queue " + requests);
     }
-    
+
     final Trackable t;
     if (message instanceof Trackable){
       t = (Trackable) message;
@@ -307,12 +222,12 @@
           try {
             Thread.sleep(0, 555555);
           } catch (InterruptedException e) {
-            
+
           }
         }
       }
     });
-    
+
     try {
       //TODO this needs to be a setting base on attempts/second
       return response.get(1, TimeUnit.SECONDS);
@@ -324,14 +239,14 @@
     } catch (TimeoutException e) {
       boolean cancelled = response.cancel(true);
       LOGGER.debug(String.format("Threadpool timeout attempting to contact %s, cancelled ? %b", uri.toString(), cancelled));
-      return null; 
+      return null;
     } finally {
       if (t != null){
         requests.remove(t.getUuid() + "/" + t.getUriFrom());
       }
     }
   }
-  
+
   /**
    * Sends a message across the network while blocking. Catches and ignores IOException in transmission. Used
    * when the protocol for the message is not to wait for a response
@@ -359,21 +274,25 @@
       DatagramPacket datagramPacket = new DatagramPacket(json_bytes, json_bytes.length, dest, u.getPort());
       socket.send(datagramPacket);
       tranmissionSuccess.mark();
-    } catch (IOException ex) { 
+    } catch (IOException ex) {
       tranmissionException.mark();
       LOGGER.debug("Send one way failed", ex);
     }
   }
 
+  public void addRequest(String k, Base v) {
+    requests.put(k, v);
+  }
+
   /**
    * Merge lists from remote members and update heartbeats
-   * 
+   *
    * @param gossipManager
    * @param senderMember
    * @param remoteList
-   * 
+   *
    */
-  protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember,
+  public void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember,
           List<GossipMember> remoteList) {
     if (LOGGER.isDebugEnabled()){
       debugState(senderMember, remoteList);
@@ -390,10 +309,10 @@
       if (remoteMember.getId().equals(gossipManager.getMyself().getId())) {
         continue;
       }
-      LocalGossipMember aNewMember = new LocalGossipMember(remoteMember.getClusterName(), 
-      remoteMember.getUri(), 
-      remoteMember.getId(), 
-      remoteMember.getHeartbeat(), 
+      LocalGossipMember aNewMember = new LocalGossipMember(remoteMember.getClusterName(),
+      remoteMember.getUri(),
+      remoteMember.getId(),
+      remoteMember.getHeartbeat(),
       remoteMember.getProperties(),
       gossipManager.getSettings().getWindowSize(),
       gossipManager.getSettings().getMinimumSamples(),
@@ -414,11 +333,11 @@
       debugState(senderMember, remoteList);
     }
   }
-  
+
   private void debugState(RemoteGossipMember senderMember,
           List<GossipMember> remoteList){
     LOGGER.warn(
-          "-----------------------\n" + 
+          "-----------------------\n" +
           "Me " + gossipManager.getMyself() + "\n" +
           "Sender " + senderMember + "\n" +
           "RemoteList " + remoteList + "\n" +
@@ -426,5 +345,4 @@
           "Dead " + gossipManager.getDeadMembers()+ "\n" +
           "=======================");
   }
- 
 }
diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java
index 9221aa6..ab8e4ae 100644
--- a/src/main/java/org/apache/gossip/manager/GossipManager.java
+++ b/src/main/java/org/apache/gossip/manager/GossipManager.java
@@ -19,6 +19,17 @@
 
 import com.codahale.metrics.MetricRegistry;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.gossip.GossipMember;
+import org.apache.gossip.GossipSettings;
+import org.apache.gossip.LocalGossipMember;
+import org.apache.gossip.event.GossipListener;
+import org.apache.gossip.event.GossipState;
+import org.apache.gossip.manager.handlers.MessageInvoker;
+import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread;
+import org.apache.gossip.model.GossipDataMessage;
+import org.apache.gossip.model.SharedGossipDataMessage;
+import org.apache.gossip.model.ShutdownMessage;
+import org.apache.log4j.Logger;
 
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
@@ -28,28 +39,10 @@
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
-import org.apache.log4j.Logger;
-
-import org.apache.gossip.GossipMember;
-import org.apache.gossip.GossipSettings;
-import org.apache.gossip.LocalGossipMember;
-import org.apache.gossip.event.GossipListener;
-import org.apache.gossip.event.GossipState;
-import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread;
-
-import org.apache.gossip.model.GossipDataMessage;
-import org.apache.gossip.model.SharedGossipDataMessage;
-import org.apache.gossip.model.ShutdownMessage;
-
 
 public abstract class GossipManager {
 
@@ -71,11 +64,15 @@
   private final RingStatePersister ringState;
   private final UserDataPersister userDataState;
   private final ObjectMapper objectMapper;
-  
+
+  private final MessageInvoker messageInvoker;
+
   public GossipManager(String cluster,
-          URI uri, String id, Map<String,String> properties, GossipSettings settings,
-          List<GossipMember> gossipMembers, GossipListener listener, MetricRegistry registry, ObjectMapper objectMapper) {
+                       URI uri, String id, Map<String, String> properties, GossipSettings settings,
+                       List<GossipMember> gossipMembers, GossipListener listener, MetricRegistry registry,
+                       ObjectMapper objectMapper, MessageInvoker messageInvoker) {
     this.settings = settings;
+    this.messageInvoker = messageInvoker;
     clock = new SystemClock();    
     me = new LocalGossipMember(cluster, uri, id, clock.nanoTime(), properties,
             settings.getWindowSize(), settings.getMinimumSamples(), settings.getDistribution());
@@ -104,6 +101,10 @@
     readSavedDataState();
   }
 
+  public MessageInvoker getMessageInvoker() {
+    return messageInvoker;
+  }
+
   public ConcurrentSkipListMap<LocalGossipMember, GossipState> getMembers() {
     return members;
   }
diff --git a/src/main/java/org/apache/gossip/manager/handlers/ActiveGossipMessageHandler.java b/src/main/java/org/apache/gossip/manager/handlers/ActiveGossipMessageHandler.java
new file mode 100644
index 0000000..54aa40c
--- /dev/null
+++ b/src/main/java/org/apache/gossip/manager/handlers/ActiveGossipMessageHandler.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager.handlers;
+
+import org.apache.gossip.GossipMember;
+import org.apache.gossip.RemoteGossipMember;
+import org.apache.gossip.manager.GossipCore;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.model.Base;
+import org.apache.gossip.udp.UdpActiveGossipMessage;
+import org.apache.gossip.udp.UdpActiveGossipOk;
+import org.apache.gossip.udp.UdpNotAMemberFault;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ActiveGossipMessageHandler implements MessageHandler {
+  @Override
+  public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
+    List<GossipMember> remoteGossipMembers = new ArrayList<>();
+    RemoteGossipMember senderMember = null;
+    UdpActiveGossipMessage activeGossipMessage = (UdpActiveGossipMessage) base;
+    for (int i = 0; i < activeGossipMessage.getMembers().size(); i++) {
+      URI u;
+      try {
+        u = new URI(activeGossipMessage.getMembers().get(i).getUri());
+      } catch (URISyntaxException e) {
+        GossipCore.LOGGER.debug("Gossip message with faulty URI", e);
+        continue;
+      }
+      RemoteGossipMember member = new RemoteGossipMember(
+              activeGossipMessage.getMembers().get(i).getCluster(),
+              u,
+              activeGossipMessage.getMembers().get(i).getId(),
+              activeGossipMessage.getMembers().get(i).getHeartbeat(),
+              activeGossipMessage.getMembers().get(i).getProperties());
+      if (i == 0) {
+        senderMember = member;
+      }
+      if (!(member.getClusterName().equals(gossipManager.getMyself().getClusterName()))) {
+        UdpNotAMemberFault f = new UdpNotAMemberFault();
+        f.setException("Not a member of this cluster " + i);
+        f.setUriFrom(activeGossipMessage.getUriFrom());
+        f.setUuid(activeGossipMessage.getUuid());
+        GossipCore.LOGGER.warn(f);
+        gossipCore.sendOneWay(f, member.getUri());
+        continue;
+      }
+      remoteGossipMembers.add(member);
+    }
+    UdpActiveGossipOk o = new UdpActiveGossipOk();
+    o.setUriFrom(activeGossipMessage.getUriFrom());
+    o.setUuid(activeGossipMessage.getUuid());
+    gossipCore.sendOneWay(o, senderMember.getUri());
+    gossipCore.mergeLists(gossipManager, senderMember, remoteGossipMembers);
+  }
+}
diff --git a/src/main/java/org/apache/gossip/manager/handlers/DefaultMessageInvoker.java b/src/main/java/org/apache/gossip/manager/handlers/DefaultMessageInvoker.java
new file mode 100644
index 0000000..034691d
--- /dev/null
+++ b/src/main/java/org/apache/gossip/manager/handlers/DefaultMessageInvoker.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gossip.manager.handlers;
+
+import org.apache.gossip.manager.GossipCore;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.model.*;
+
+public class DefaultMessageInvoker implements MessageInvoker {
+  private final MessageInvokerCombiner mic;
+
+  public DefaultMessageInvoker() {
+    mic = new MessageInvokerCombiner();
+    mic.add(new SimpleMessageInvoker(Response.class, new ResponseHandler()));
+    mic.add(new SimpleMessageInvoker(ShutdownMessage.class, new ShutdownMessageHandler()));
+    mic.add(new SimpleMessageInvoker(GossipDataMessage.class, new GossipDataMessageHandler()));
+    mic.add(new SimpleMessageInvoker(SharedGossipDataMessage.class, new SharedGossipDataMessageHandler()));
+    mic.add(new SimpleMessageInvoker(ActiveGossipMessage.class, new ActiveGossipMessageHandler()));
+  }
+
+  public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
+    return mic.invoke(gossipCore, gossipManager, base);
+  }
+}
diff --git a/src/main/java/org/apache/gossip/manager/handlers/GossipDataMessageHandler.java b/src/main/java/org/apache/gossip/manager/handlers/GossipDataMessageHandler.java
new file mode 100644
index 0000000..edf2579
--- /dev/null
+++ b/src/main/java/org/apache/gossip/manager/handlers/GossipDataMessageHandler.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager.handlers;
+
+import org.apache.gossip.manager.GossipCore;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.model.Base;
+import org.apache.gossip.udp.UdpGossipDataMessage;
+
+public class GossipDataMessageHandler implements MessageHandler {
+  @Override
+  public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
+    UdpGossipDataMessage message = (UdpGossipDataMessage) base;
+    gossipCore.addPerNodeData(message);
+  }
+}
diff --git a/src/main/java/org/apache/gossip/manager/handlers/MessageHandler.java b/src/main/java/org/apache/gossip/manager/handlers/MessageHandler.java
new file mode 100644
index 0000000..4b5d49d
--- /dev/null
+++ b/src/main/java/org/apache/gossip/manager/handlers/MessageHandler.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager.handlers;
+
+import org.apache.gossip.manager.GossipCore;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.model.Base;
+
+public interface MessageHandler {
+  void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base);
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/gossip/manager/handlers/MessageInvoker.java b/src/main/java/org/apache/gossip/manager/handlers/MessageInvoker.java
new file mode 100644
index 0000000..b4a39e3
--- /dev/null
+++ b/src/main/java/org/apache/gossip/manager/handlers/MessageInvoker.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager.handlers;
+
+import org.apache.gossip.manager.GossipCore;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.model.Base;
+
+public interface MessageInvoker {
+  boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base);
+}
diff --git a/src/main/java/org/apache/gossip/manager/handlers/MessageInvokerCombiner.java b/src/main/java/org/apache/gossip/manager/handlers/MessageInvokerCombiner.java
new file mode 100644
index 0000000..7b654f6
--- /dev/null
+++ b/src/main/java/org/apache/gossip/manager/handlers/MessageInvokerCombiner.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gossip.manager.handlers;
+
+import org.apache.gossip.manager.GossipCore;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.model.Base;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+public class MessageInvokerCombiner implements MessageInvoker {
+  private List<MessageInvoker> invokers;
+
+  public MessageInvokerCombiner() {
+  }
+
+  public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
+    if (invokers == null) {
+      return false;
+    }
+    boolean result = false;
+    for (MessageInvoker mi : invokers) {
+      result = mi.invoke(gossipCore, gossipManager, base) || result;
+    }
+    return result;
+  }
+
+  public void clear() {
+    invokers = null;
+  }
+
+  public void add(MessageInvoker mi) {
+    if (mi == null) {
+      throw new NullPointerException();
+    }
+    if (invokers == null) {
+      invokers = new CopyOnWriteArrayList<>();
+    }
+    invokers.add(mi);
+  }
+}
diff --git a/src/main/java/org/apache/gossip/manager/handlers/ResponseHandler.java b/src/main/java/org/apache/gossip/manager/handlers/ResponseHandler.java
new file mode 100644
index 0000000..ad1c2aa
--- /dev/null
+++ b/src/main/java/org/apache/gossip/manager/handlers/ResponseHandler.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager.handlers;
+
+import org.apache.gossip.manager.GossipCore;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.model.Base;
+import org.apache.gossip.model.GossipDataMessage;
+import org.apache.gossip.model.ShutdownMessage;
+import org.apache.gossip.udp.Trackable;
+
+public class ResponseHandler implements MessageHandler {
+  @Override
+  public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
+    if (base instanceof Trackable) {
+      Trackable t = (Trackable) base;
+      gossipCore.addRequest(t.getUuid() + "/" + t.getUriFrom(), (Base) t);
+    }
+  }
+}
diff --git a/src/main/java/org/apache/gossip/manager/handlers/SharedGossipDataMessageHandler.java b/src/main/java/org/apache/gossip/manager/handlers/SharedGossipDataMessageHandler.java
new file mode 100644
index 0000000..e9d5343
--- /dev/null
+++ b/src/main/java/org/apache/gossip/manager/handlers/SharedGossipDataMessageHandler.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager.handlers;
+
+import org.apache.gossip.manager.GossipCore;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.model.Base;
+import org.apache.gossip.udp.UdpSharedGossipDataMessage;
+
+public class SharedGossipDataMessageHandler implements MessageHandler{
+  @Override
+  public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
+    UdpSharedGossipDataMessage message = (UdpSharedGossipDataMessage) base;
+    gossipCore.addSharedData(message);
+  }
+}
diff --git a/src/main/java/org/apache/gossip/manager/handlers/ShutdownMessageHandler.java b/src/main/java/org/apache/gossip/manager/handlers/ShutdownMessageHandler.java
new file mode 100644
index 0000000..c4adea2
--- /dev/null
+++ b/src/main/java/org/apache/gossip/manager/handlers/ShutdownMessageHandler.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager.handlers;
+
+import org.apache.gossip.manager.GossipCore;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.model.Base;
+import org.apache.gossip.model.GossipDataMessage;
+import org.apache.gossip.model.ShutdownMessage;
+
+public class ShutdownMessageHandler implements MessageHandler {
+  @Override
+  public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
+    ShutdownMessage s = (ShutdownMessage) base;
+    GossipDataMessage m = new GossipDataMessage();
+    m.setKey(ShutdownMessage.PER_NODE_KEY);
+    m.setNodeId(s.getNodeId());
+    m.setPayload(base);
+    m.setTimestamp(System.currentTimeMillis());
+    m.setExpireAt(System.currentTimeMillis() + 30L * 1000L);
+    gossipCore.addPerNodeData(m);
+  }
+}
diff --git a/src/main/java/org/apache/gossip/manager/handlers/SimpleMessageInvoker.java b/src/main/java/org/apache/gossip/manager/handlers/SimpleMessageInvoker.java
new file mode 100644
index 0000000..0f410d2
--- /dev/null
+++ b/src/main/java/org/apache/gossip/manager/handlers/SimpleMessageInvoker.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager.handlers;
+
+import org.apache.gossip.manager.GossipCore;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.model.Base;
+
+public class SimpleMessageInvoker implements MessageInvoker {
+  final private Class<?> messageClass;
+  final private MessageHandler messageHandler;
+
+  public SimpleMessageInvoker(Class<?> messageClass, MessageHandler messageHandler) {
+    if (messageClass == null || messageHandler == null) {
+      throw new NullPointerException();
+    }
+    this.messageClass = messageClass;
+    this.messageHandler = messageHandler;
+  }
+
+  @Override
+  public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
+    if (messageClass.isAssignableFrom(base.getClass())) {
+      messageHandler.invoke(gossipCore, gossipManager, base);
+      return true;
+    } else {
+      return false;
+    }
+  }
+}
diff --git a/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java b/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java
index 00e3378..bf8a8c3 100644
--- a/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java
+++ b/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java
@@ -19,17 +19,18 @@
 
 import com.codahale.metrics.MetricRegistry;
 import com.fasterxml.jackson.databind.ObjectMapper;
-
 import org.apache.gossip.GossipMember;
 import org.apache.gossip.GossipSettings;
 import org.apache.gossip.event.GossipListener;
 import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.manager.handlers.DefaultMessageInvoker;
+import org.apache.gossip.manager.handlers.MessageInvoker;
 
 import java.net.URI;
-import java.util.List;
-import java.util.Map;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 public class RandomGossipManager extends GossipManager {
 
@@ -47,6 +48,7 @@
     private MetricRegistry registry;
     private Map<String,String> properties;
     private ObjectMapper objectMapper;
+    private MessageInvoker messageInvoker;
 
     private ManagerBuilder() {}
 
@@ -100,7 +102,12 @@
       this.objectMapper = objectMapper;
       return this;
     }
-    
+
+    public ManagerBuilder messageInvoker(MessageInvoker messageInvoker) {
+      this.messageInvoker = messageInvoker;
+      return this;
+    }
+
     public RandomGossipManager build() {
       checkArgument(id != null, "You must specify an id");
       checkArgument(cluster != null, "You must specify a cluster name");
@@ -120,12 +127,15 @@
         objectMapper = new ObjectMapper();
         objectMapper.enableDefaultTyping();
       }
-      return new RandomGossipManager(cluster, uri, id, properties, settings, gossipMembers, listener, registry, objectMapper);
+      if (messageInvoker == null) {
+        messageInvoker = new DefaultMessageInvoker();
+      }
+      return new RandomGossipManager(cluster, uri, id, properties, settings, gossipMembers, listener, registry, objectMapper, messageInvoker);
     }
   }
 
   private RandomGossipManager(String cluster, URI uri, String id, Map<String,String> properties,  GossipSettings settings, 
-          List<GossipMember> gossipMembers, GossipListener listener, MetricRegistry registry, ObjectMapper objectMapper) {
-    super(cluster, uri, id, properties, settings, gossipMembers, listener, registry, objectMapper);
+          List<GossipMember> gossipMembers, GossipListener listener, MetricRegistry registry, ObjectMapper objectMapper, MessageInvoker messageInvoker) {
+    super(cluster, uri, id, properties, settings, gossipMembers, listener, registry, objectMapper, messageInvoker);
   }
 }
diff --git a/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java b/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java
index cf38492..2d04087 100644
--- a/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java
+++ b/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java
@@ -21,11 +21,17 @@
 import org.apache.gossip.GossipMember;
 import org.apache.gossip.GossipSettings;
 import org.apache.gossip.LocalGossipMember;
+import org.apache.gossip.manager.handlers.DefaultMessageInvoker;
+import org.apache.gossip.manager.handlers.MessageInvoker;
+import org.apache.gossip.manager.handlers.ResponseHandler;
+import org.apache.gossip.manager.handlers.SimpleMessageInvoker;
 import org.apache.gossip.manager.random.RandomGossipManager;
+import org.junit.Assert;
 import org.junit.jupiter.api.Test;
 import org.junit.platform.runner.JUnitPlatform;
 import org.junit.runner.RunWith;
 
+import javax.xml.ws.Response;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
@@ -72,6 +78,31 @@
   }
 
   @Test
+  public void createDefaultMessageInvokerIfNull() throws URISyntaxException {
+    RandomGossipManager gossipManager = RandomGossipManager.newBuilder()
+        .withId("id")
+        .cluster("aCluster")
+        .uri(new URI("udp://localhost:2000"))
+        .settings(new GossipSettings())
+        .messageInvoker(null).registry(new MetricRegistry()).build();
+    assertNotNull(gossipManager.getMessageInvoker());
+    Assert.assertEquals(gossipManager.getMessageInvoker().getClass(), new DefaultMessageInvoker().getClass());
+  }
+
+  @Test
+  public void testMessageInvokerKeeping() throws URISyntaxException {
+    MessageInvoker mi = new SimpleMessageInvoker(Response.class, new ResponseHandler());
+    RandomGossipManager gossipManager = RandomGossipManager.newBuilder()
+        .withId("id")
+        .cluster("aCluster")
+        .uri(new URI("udp://localhost:2000"))
+        .settings(new GossipSettings())
+        .messageInvoker(mi).registry(new MetricRegistry()).build();
+    assertNotNull(gossipManager.getMessageInvoker());
+    Assert.assertEquals(gossipManager.getMessageInvoker(), mi);
+  }
+
+  @Test
   public void useMemberListIfProvided() throws URISyntaxException {
     LocalGossipMember member = new LocalGossipMember(
             "aCluster", new URI("udp://localhost:2000"), "aGossipMember",
diff --git a/src/test/java/org/apache/gossip/manager/handlers/MessageInvokerTest.java b/src/test/java/org/apache/gossip/manager/handlers/MessageInvokerTest.java
new file mode 100644
index 0000000..d402d59
--- /dev/null
+++ b/src/test/java/org/apache/gossip/manager/handlers/MessageInvokerTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager.handlers;
+
+import org.apache.gossip.manager.GossipCore;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.model.ActiveGossipMessage;
+import org.apache.gossip.model.Base;
+import org.apache.gossip.udp.UdpSharedGossipDataMessage;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class MessageInvokerTest {
+  private class FakeMessage extends Base {
+    public FakeMessage() {
+    }
+  }
+
+  private class FakeMessageData extends Base {
+    public int data;
+
+    public FakeMessageData(int data) {
+      this.data = data;
+    }
+  }
+
+  private class FakeMessageDataHandler implements MessageHandler {
+    public int data;
+
+    public FakeMessageDataHandler() {
+      data = 0;
+    }
+
+    public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
+      data = ((FakeMessageData) base).data;
+    }
+  }
+
+  private class FakeMessageHandler implements MessageHandler {
+    public int counter;
+
+    public FakeMessageHandler() {
+      counter = 0;
+    }
+
+    public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
+      counter++;
+    }
+  }
+
+  @Test
+  public void testSimpleInvoker() {
+    MessageInvoker mi = new SimpleMessageInvoker(FakeMessage.class, new FakeMessageHandler());
+    Assert.assertTrue(mi.invoke(null, null, new FakeMessage()));
+    Assert.assertFalse(mi.invoke(null, null, new ActiveGossipMessage()));
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testSimpleInvokerNullClassConstructor() {
+    new SimpleMessageInvoker(null, new FakeMessageHandler());
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testSimpleInvokerNullHandlerConstructor() {
+    new SimpleMessageInvoker(FakeMessage.class, null);
+  }
+
+  @Test
+  public void testCallCountSimpleInvoker() {
+    FakeMessageHandler h = new FakeMessageHandler();
+    MessageInvoker mi = new SimpleMessageInvoker(FakeMessage.class, h);
+    mi.invoke(null, null, new FakeMessage());
+    Assert.assertEquals(1, h.counter);
+    mi.invoke(null, null, new ActiveGossipMessage());
+    Assert.assertEquals(1, h.counter);
+    mi.invoke(null, null, new FakeMessage());
+    Assert.assertEquals(2, h.counter);
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void cantAddNullInvoker() {
+    MessageInvokerCombiner mi = new MessageInvokerCombiner();
+    mi.add(null);
+  }
+
+  @Test
+  public void testCombinerClear() {
+    MessageInvokerCombiner mi = new MessageInvokerCombiner();
+    mi.add(new SimpleMessageInvoker(FakeMessage.class, new FakeMessageHandler()));
+    Assert.assertTrue(mi.invoke(null, null, new FakeMessage()));
+
+    mi.clear();
+    Assert.assertFalse(mi.invoke(null, null, new FakeMessage()));
+  }
+
+  @Test
+  public void testMessageInvokerCombiner() {
+    //Empty combiner - false result
+    MessageInvokerCombiner mi = new MessageInvokerCombiner();
+    Assert.assertFalse(mi.invoke(null, null, new Base()));
+
+    FakeMessageHandler h = new FakeMessageHandler();
+    mi.add(new SimpleMessageInvoker(FakeMessage.class, h));
+    mi.add(new SimpleMessageInvoker(FakeMessage.class, h));
+
+    Assert.assertTrue(mi.invoke(null, null, new FakeMessage()));
+    Assert.assertFalse(mi.invoke(null, null, new ActiveGossipMessage()));
+    Assert.assertEquals(2, h.counter);
+
+    //Increase size in runtime. Should be 3 calls: 2+3 = 5
+    mi.add(new SimpleMessageInvoker(FakeMessage.class, h));
+    Assert.assertTrue(mi.invoke(null, null, new FakeMessage()));
+    Assert.assertEquals(5, h.counter);
+  }
+
+  @Test
+  public void testMessageInvokerCombiner2levels() {
+    MessageInvokerCombiner mi = new MessageInvokerCombiner();
+    FakeMessageHandler h = new FakeMessageHandler();
+
+    MessageInvokerCombiner mi1 = new MessageInvokerCombiner();
+    mi1.add(new SimpleMessageInvoker(FakeMessage.class, h));
+    mi1.add(new SimpleMessageInvoker(FakeMessage.class, h));
+
+    MessageInvokerCombiner mi2 = new MessageInvokerCombiner();
+    mi2.add(new SimpleMessageInvoker(FakeMessage.class, h));
+    mi2.add(new SimpleMessageInvoker(FakeMessage.class, h));
+
+    mi.add(mi1);
+    mi.add(mi2);
+
+    Assert.assertTrue(mi.invoke(null, null, new FakeMessage()));
+    Assert.assertEquals(4, h.counter);
+  }
+
+  @Test
+  public void testMessageInvokerCombinerDataShipping() {
+    MessageInvokerCombiner mi = new MessageInvokerCombiner();
+    FakeMessageDataHandler h = new FakeMessageDataHandler();
+    mi.add(new SimpleMessageInvoker(FakeMessageData.class, h));
+
+    Assert.assertTrue(mi.invoke(null, null, new FakeMessageData(101)));
+    Assert.assertEquals(101, h.data);
+  }
+
+  @Test
+  public void testCombiningDefaultInvoker() {
+    MessageInvokerCombiner mi = new MessageInvokerCombiner();
+    mi.add(new DefaultMessageInvoker());
+    mi.add(new SimpleMessageInvoker(FakeMessage.class, new FakeMessageHandler()));
+    //UdpSharedGossipDataMessage with null gossipCore -> exception
+    boolean thrown = false;
+    try {
+      mi.invoke(null, null, new UdpSharedGossipDataMessage());
+    } catch (NullPointerException e) {
+      thrown = true;
+    }
+    Assert.assertTrue(thrown);
+    //DefaultInvoker skips FakeMessage and FakeHandler works ok
+    Assert.assertTrue(mi.invoke(null, null, new FakeMessage()));
+  }
+
+}