Use Lambdas instead of Runnables, deprecate old inner classes
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPListener.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPListener.java
index 3045253..8f457f8 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPListener.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPListener.java
@@ -19,7 +19,6 @@
  * under the License.
  */
 
-import java.io.EOFException;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
@@ -29,7 +28,6 @@
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.SocketAddress;
-import java.net.SocketException;
 import java.net.SocketTimeoutException;
 import java.util.Map;
 import java.util.Set;
@@ -72,9 +70,6 @@
     private static final ConcurrentHashMap<String, ILateralCacheListener<?, ?>> instances =
         new ConcurrentHashMap<>();
 
-    /** The socket listener */
-    private ListenerThread receiver;
-
     /** Configuration attributes */
     private ITCPLateralCacheAttributes tcpLateralCacheAttributes;
 
@@ -122,8 +117,7 @@
                     newIns.init();
                     newIns.setCacheManager( cacheMgr );
 
-                    log.info( "Created new listener {0}",
-                            () -> ilca.getTcpListenerPort() );
+                    log.info("Created new listener {0}", () -> ilca.getTcpListenerPort());
 
                     return newIns;
                 });
@@ -176,9 +170,7 @@
             }
             serverSocket.setSoTimeout( acceptTimeOut );
 
-            receiver = new ListenerThread(serverSocket);
-            receiver.setDaemon( true );
-            receiver.start();
+            pooledExecutor.execute(() -> runListener(serverSocket));
         }
         catch ( final IOException ex )
         {
@@ -444,7 +436,9 @@
     /**
      * Processes commands from the server socket. There should be one listener for each configured
      * TCP lateral.
+     * @deprecated No longer used
      */
+    @Deprecated
     public class ListenerThread
         extends Thread
     {
@@ -462,60 +456,62 @@
         }
 
         /** Main processing method for the ListenerThread object */
-        @SuppressWarnings("synthetic-access")
         @Override
         public void run()
         {
-            try (ServerSocket ssck = serverSocket)
+            runListener(serverSocket);
+        }
+    }
+
+    /**
+     * Processes commands from the server socket. There should be one listener for each configured
+     * TCP lateral.
+     */
+    private void runListener(final ServerSocket serverSocket)
+    {
+        try
+        {
+            while ( true )
             {
-                ConnectionHandler handler;
+                log.debug( "Waiting for clients to connect " );
 
-                outer: while ( true )
+                // Check to see if we've been asked to exit, and exit
+                if (terminated.get())
                 {
-                    log.debug( "Waiting for clients to connect " );
+                    log.debug("Thread terminated, exiting gracefully");
+                    break;
+                }
 
-                    Socket socket = null;
-                    inner: while (true)
+                try
+                {
+                    final Socket socket = serverSocket.accept();
+
+                    if (socket != null)
                     {
-                        // Check to see if we've been asked to exit, and exit
-                        if (terminated.get())
-                        {
-                            log.debug("Thread terminated, exiting gracefully");
-                            break outer;
-                        }
-
-                        try
-                        {
-                            socket = ssck.accept();
-                            break inner;
-                        }
-                        catch (final SocketTimeoutException e)
-                        {
-                            // No problem! We loop back up!
-                            continue inner;
-                        }
+                        log.debug("Connected to client at {0}", () -> socket.getInetAddress());
                     }
 
-                    if ( socket != null && log.isDebugEnabled() )
-                    {
-                        final InetAddress inetAddress = socket.getInetAddress();
-                        log.debug( "Connected to client at {0}", inetAddress );
-                    }
-
-                    handler = new ConnectionHandler( socket );
-                    pooledExecutor.execute( handler );
+                    pooledExecutor.execute(() -> handleConnection(socket));
+                }
+                catch (final SocketTimeoutException e)
+                {
+                    // No problem! We loop back up!
                 }
             }
-            catch ( final IOException e )
-            {
-                log.error( "Exception caught in TCP listener", e );
-            }
+
+            serverSocket.close();
+        }
+        catch ( final IOException e )
+        {
+            log.error( "Exception caught in TCP listener", e );
         }
     }
 
     /**
      * A Separate thread that runs when a command comes into the LateralTCPReceiver.
+     * @deprecated No longer used
      */
+    @Deprecated
     public class ConnectionHandler
         implements Runnable
     {
@@ -535,125 +531,127 @@
          * Main processing method for the LateralTCPReceiverConnection object
          */
         @Override
-        @SuppressWarnings({"unchecked", // Need to cast from Object
-            "synthetic-access" })
         public void run()
         {
-            try (ObjectInputStream ois =
-                    new ObjectInputStreamClassLoaderAware( socket.getInputStream(), null ))
+            handleConnection(socket);
+        }
+    }
+
+    /**
+     * A Separate thread that runs when a command comes into the LateralTCPReceiver.
+     */
+    private void handleConnection(final Socket socket)
+    {
+        try (ObjectInputStream ois =
+                new ObjectInputStreamClassLoaderAware( socket.getInputStream(), null ))
+        {
+            while ( true )
             {
-                while ( true )
+                @SuppressWarnings("unchecked") // Need to cast from Object
+                final LateralElementDescriptor<K, V> led =
+                        (LateralElementDescriptor<K, V>) ois.readObject();
+
+                if ( led == null )
                 {
-                    final LateralElementDescriptor<K, V> led =
-                            (LateralElementDescriptor<K, V>) ois.readObject();
+                    log.debug( "LateralElementDescriptor is null" );
+                    continue;
+                }
+                if ( led.requesterId == getListenerId() )
+                {
+                    log.debug( "from self" );
+                }
+                else
+                {
+                    log.debug( "receiving LateralElementDescriptor from another led = {0}",
+                            led );
 
-                    if ( led == null )
-                    {
-                        log.debug( "LateralElementDescriptor is null" );
-                        continue;
-                    }
-                    if ( led.requesterId == getListenerId() )
-                    {
-                        log.debug( "from self" );
-                    }
-                    else
-                    {
-                        log.debug( "receiving LateralElementDescriptor from another led = {0}",
-                                led );
-
-                        handle( led );
-                    }
+                    handleElement(led, socket);
                 }
             }
-            catch ( final EOFException e )
-            {
-                log.info( "Caught EOFException, closing connection.", e );
-            }
-            catch ( final SocketException e )
-            {
-                log.info( "Caught SocketException, closing connection.", e );
-            }
-            catch ( final Exception e )
-            {
-                log.error( "Unexpected exception.", e );
-            }
         }
-
-        /**
-         * This calls the appropriate method, based on the command sent in the Lateral element
-         * descriptor.
-         * <p>
-         * @param led
-         * @throws IOException
-         */
-        @SuppressWarnings("synthetic-access")
-        private void handle( final LateralElementDescriptor<K, V> led )
-            throws IOException
+        catch (final IOException e)
         {
-            final String cacheName = led.ce.getCacheName();
-            final K key = led.ce.getKey();
-            Serializable obj = null;
+            log.info("Caught {0}, closing connection.", e.getClass().getSimpleName(), e);
+        }
+        catch (final ClassNotFoundException e)
+        {
+            log.error( "Deserialization failed reading from socket", e );
+        }
+    }
 
-            switch (led.command)
-            {
-                case UPDATE:
-                    handlePut( led.ce );
-                    break;
+    /**
+     * This calls the appropriate method, based on the command sent in the Lateral element
+     * descriptor.
+     * <p>
+     * @param led the lateral element
+     * @param socket the socket
+     * @throws IOException
+     */
+    private void handleElement(final LateralElementDescriptor<K, V> led, Socket socket) throws IOException
+    {
+        final String cacheName = led.ce.getCacheName();
+        final K key = led.ce.getKey();
+        Serializable obj = null;
 
-                case REMOVE:
-                    // if a hashcode was given and filtering is on
-                    // check to see if they are the same
-                    // if so, then don't remove, otherwise issue a remove
-                    if ( led.valHashCode != -1 )
+        switch (led.command)
+        {
+            case UPDATE:
+                handlePut( led.ce );
+                break;
+
+            case REMOVE:
+                // if a hashcode was given and filtering is on
+                // check to see if they are the same
+                // if so, then don't remove, otherwise issue a remove
+                if ( led.valHashCode != -1 )
+                {
+                    if ( getTcpLateralCacheAttributes().isFilterRemoveByHashCode() )
                     {
-                        if ( getTcpLateralCacheAttributes().isFilterRemoveByHashCode() )
+                        final ICacheElement<K, V> test = getCache( cacheName ).localGet( key );
+                        if ( test != null )
                         {
-                            final ICacheElement<K, V> test = getCache( cacheName ).localGet( key );
-                            if ( test != null )
+                            if ( test.getVal().hashCode() == led.valHashCode )
                             {
-                                if ( test.getVal().hashCode() == led.valHashCode )
-                                {
-                                    log.debug( "Filtering detected identical hashCode [{0}], "
-                                            + "not issuing a remove for led {1}",
-                                            led.valHashCode, led );
-                                    return;
-                                }
-                                else
-                                {
-                                    log.debug( "Different hashcodes, in cache [{0}] sent [{1}]",
-                                            test.getVal().hashCode(), led.valHashCode );
-                                }
+                                log.debug( "Filtering detected identical hashCode [{0}], "
+                                        + "not issuing a remove for led {1}",
+                                        led.valHashCode, led );
+                                return;
+                            }
+                            else
+                            {
+                                log.debug( "Different hashcodes, in cache [{0}] sent [{1}]",
+                                        test.getVal().hashCode(), led.valHashCode );
                             }
                         }
                     }
-                    handleRemove( cacheName, key );
-                    break;
+                }
+                handleRemove( cacheName, key );
+                break;
 
-                case REMOVEALL:
-                    handleRemoveAll( cacheName );
-                    break;
+            case REMOVEALL:
+                handleRemoveAll( cacheName );
+                break;
 
-                case GET:
-                    obj = handleGet( cacheName, key );
-                    break;
+            case GET:
+                obj = handleGet( cacheName, key );
+                break;
 
-                case GET_MATCHING:
-                    obj = (Serializable) handleGetMatching( cacheName, (String) key );
-                    break;
+            case GET_MATCHING:
+                obj = (Serializable) handleGetMatching( cacheName, (String) key );
+                break;
 
-                case GET_KEYSET:
-                	obj = (Serializable) handleGetKeySet(cacheName);
-                    break;
+            case GET_KEYSET:
+                obj = (Serializable) handleGetKeySet(cacheName);
+                break;
 
-                default: break;
-            }
+            default: break;
+        }
 
-            if (obj != null)
-            {
-                final ObjectOutputStream oos = new ObjectOutputStream( socket.getOutputStream() );
-                oos.writeObject( obj );
-                oos.flush();
-            }
+        if (obj != null)
+        {
+            final ObjectOutputStream oos = new ObjectOutputStream( socket.getOutputStream() );
+            oos.writeObject( obj );
+            oos.flush();
         }
     }
 
@@ -666,8 +664,7 @@
         if ( shutdown.compareAndSet(false, true) )
         {
             log.info( "Shutting down TCP Lateral receiver." );
-
-            receiver.interrupt();
+            pooledExecutor.shutdownNow();
         }
         else
         {
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/AbstractCacheEventQueue.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/AbstractCacheEventQueue.java
index 6055a82..f131313 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/AbstractCacheEventQueue.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/AbstractCacheEventQueue.java
@@ -205,45 +205,40 @@
      */
     protected abstract class AbstractCacheEvent implements Runnable
     {
-        /** Number of failures encountered processing this event. */
-        int failures;
-
         /**
          * Main processing method for the AbstractCacheEvent object
          */
         @Override
-        @SuppressWarnings("synthetic-access")
         public void run()
         {
-            try
+            for (int failures = 0; failures < maxFailure; failures++)
             {
-                doRun();
-            }
-            catch ( final IOException e )
-            {
-                log.warn( e );
-                if ( ++failures >= maxFailure )
+                try
                 {
-                    log.warn( "Error while running event from Queue: {0}. "
-                            + "Dropping Event and marking Event Queue as "
-                            + "non-functional.", this );
-                    destroy();
+                    doRun();
                     return;
                 }
-                log.info( "Error while running event from Queue: {0}. "
-                        + "Retrying...", this );
+                catch (final IOException e)
+                {
+                    log.warn("Error while running event from Queue: {0}. "
+                            + "Retrying...", this, e);
+                }
+
                 try
                 {
                     Thread.sleep( waitBeforeRetry );
-                    run();
                 }
                 catch ( final InterruptedException ie )
                 {
-                    log.warn( "Interrupted while sleeping for retry on event "
-                            + "{0}.", this );
-                    destroy();
+                    log.warn("Interrupted while sleeping for retry on event "
+                            + "{0}.", this, ie);
+                    break;
                 }
             }
+
+            log.warn( "Dropping Event and marking Event Queue {0} as "
+                    + "non-functional.", this );
+            destroy();
         }
 
         /**
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/control/event/ElementEventQueue.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/control/event/ElementEventQueue.java
index e59c611..fd85be2 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/control/event/ElementEventQueue.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/control/event/ElementEventQueue.java
@@ -21,6 +21,7 @@
 
 import java.io.IOException;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.jcs3.engine.control.event.behavior.IElementEvent;
 import org.apache.commons.jcs3.engine.control.event.behavior.IElementEventHandler;
@@ -28,8 +29,8 @@
 import org.apache.commons.jcs3.log.Log;
 import org.apache.commons.jcs3.log.LogManager;
 import org.apache.commons.jcs3.utils.threadpool.PoolConfiguration;
-import org.apache.commons.jcs3.utils.threadpool.ThreadPoolManager;
 import org.apache.commons.jcs3.utils.threadpool.PoolConfiguration.WhenBlockedPolicy;
+import org.apache.commons.jcs3.utils.threadpool.ThreadPoolManager;
 
 /**
  * An event queue is used to propagate ordered cache events to one and only one target listener.
@@ -43,7 +44,7 @@
     private static final Log log = LogManager.getLog( ElementEventQueue.class );
 
     /** shutdown or not */
-    private boolean destroyed;
+    private AtomicBoolean destroyed = new AtomicBoolean(false);
 
     /** The worker thread pool. */
     private ExecutorService queueProcessor;
@@ -65,15 +66,10 @@
     @Override
     public void dispose()
     {
-        if ( !destroyed )
+        if (destroyed.compareAndSet(false, true))
         {
-            destroyed = true;
-
-            // synchronize on queue so the thread will not wait forever,
-            // and then interrupt the QueueProcessor
+            // shut down the QueueProcessor
             queueProcessor.shutdownNow();
-            queueProcessor = null;
-
             log.info( "Element event queue destroyed: {0}", this );
         }
     }
@@ -89,19 +85,15 @@
         throws IOException
     {
 
-        log.debug( "Adding Event Handler to QUEUE, !destroyed = {0}", !destroyed );
+        log.debug("Adding Event Handler to QUEUE, !destroyed = {0}", !destroyed.get());
 
-        if (destroyed)
+        if (destroyed.get())
         {
             log.warn("Event submitted to disposed element event queue {0}", event);
         }
         else
         {
-            final ElementEventRunner runner = new ElementEventRunner( hand, event );
-
-            log.debug( "runner = {0}", runner );
-
-            queueProcessor.execute(runner);
+            queueProcessor.execute(() -> hand.handleElementEvent(event));
         }
     }
 
@@ -109,14 +101,15 @@
 
     /**
      * Retries before declaring failure.
+     * @deprecated No longer used
      */
+    @Deprecated
     protected abstract class AbstractElementEventRunner
         implements Runnable
     {
         /**
          * Main processing method for the AbstractElementEvent object
          */
-        @SuppressWarnings("synthetic-access")
         @Override
         public void run()
         {
@@ -140,45 +133,4 @@
         protected abstract void doRun()
             throws IOException;
     }
-
-    /**
-     * ElementEventRunner.
-     */
-    private class ElementEventRunner
-        extends AbstractElementEventRunner
-    {
-        /** the handler */
-        private final IElementEventHandler hand;
-
-        /** event */
-        private final IElementEvent<?> event;
-
-        /**
-         * Constructor for the PutEvent object.
-         * <p>
-         * @param hand
-         * @param event
-         * @throws IOException
-         */
-        @SuppressWarnings("synthetic-access")
-        ElementEventRunner( final IElementEventHandler hand, final IElementEvent<?> event )
-            throws IOException
-        {
-            log.debug( "Constructing {0}", this );
-            this.hand = hand;
-            this.event = event;
-        }
-
-        /**
-         * Tells the handler to handle the event.
-         * <p>
-         * @throws IOException
-         */
-        @Override
-        protected void doRun()
-            throws IOException
-        {
-            hand.handleElementEvent( event );
-        }
-    }
 }
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPCleanupRunner.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPCleanupRunner.java
index e94f5ac..647dc2d 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPCleanupRunner.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPCleanupRunner.java
@@ -19,9 +19,6 @@
  * under the License.
  */
 
-import java.util.HashSet;
-import java.util.Set;
-
 import org.apache.commons.jcs3.log.Log;
 import org.apache.commons.jcs3.log.LogManager;
 
@@ -66,27 +63,20 @@
     {
         final long now = System.currentTimeMillis();
 
-        // iterate through the set
-        // it is thread safe
-        // TODO this should get a copy.  you can't simply remove from this.
         // the listeners need to be notified.
-        final Set<DiscoveredService> toRemove = new HashSet<>();
-        // can't remove via the iterator. must remove directly
-        for (final DiscoveredService service : discoveryService.getDiscoveredServices())
-        {
-            if ( ( now - service.getLastHearFromTime() ) > ( maxIdleTimeSeconds * 1000 ) )
-            {
-                log.info( "Removing service, since we haven't heard from it in "
-                        + "{0} seconds. service = {1}", maxIdleTimeSeconds, service );
-                toRemove.add( service );
-            }
-        }
+        discoveryService.getDiscoveredServices().stream()
+            .filter(service -> {
+                if (now - service.getLastHearFromTime() > maxIdleTimeSeconds * 1000)
+                {
+                    log.info( "Removing service, since we haven't heard from it in "
+                            + "{0} seconds. service = {1}", maxIdleTimeSeconds, service );
+                    return true;
+                }
 
-        // remove the bad ones
-        for (final DiscoveredService service : toRemove)
-        {
+                return false;
+            })
+            // remove the bad ones
             // call this so the listeners get notified
-            discoveryService.removeDiscoveredService( service );
-        }
+            .forEach(service -> discoveryService.removeDiscoveredService(service));
     }
 }
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryReceiver.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryReceiver.java
index 3f03ca5..5e71d63 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryReceiver.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryReceiver.java
@@ -37,8 +37,8 @@
 import org.apache.commons.jcs3.utils.discovery.UDPDiscoveryMessage.BroadcastType;
 import org.apache.commons.jcs3.utils.net.HostNameUtil;
 import org.apache.commons.jcs3.utils.threadpool.PoolConfiguration;
-import org.apache.commons.jcs3.utils.threadpool.ThreadPoolManager;
 import org.apache.commons.jcs3.utils.threadpool.PoolConfiguration.WhenBlockedPolicy;
+import org.apache.commons.jcs3.utils.threadpool.ThreadPoolManager;
 
 /** Receives UDP Discovery messages. */
 public class UDPDiscoveryReceiver
@@ -210,18 +210,13 @@
 
                 log.debug( "{0} messages received.", this::getCnt );
 
-                UDPDiscoveryMessage message = null;
-
                 try
                 {
-                    message = (UDPDiscoveryMessage) obj;
+                    UDPDiscoveryMessage message = (UDPDiscoveryMessage) obj;
                     // check for null
                     if ( message != null )
                     {
-                        final MessageHandler handler = new MessageHandler( message );
-
-                        pooledExecutor.execute( handler );
-
+                        pooledExecutor.execute(() -> handleMessage(message));
                         log.debug( "Passed handler to executor." );
                     }
                     else
@@ -269,7 +264,9 @@
 
     /**
      * Separate thread run when a command comes into the UDPDiscoveryReceiver.
+     * @deprectaed No longer used
      */
+    @Deprecated
     public class MessageHandler
         implements Runnable
     {
@@ -287,62 +284,69 @@
         /**
          * Process the message.
          */
-        @SuppressWarnings("synthetic-access")
         @Override
         public void run()
         {
-            // consider comparing ports here instead.
-            if ( message.getRequesterId() == CacheInfo.listenerId )
-            {
-                log.debug( "Ignoring message sent from self" );
-            }
-            else
-            {
-                log.debug( "Process message sent from another" );
-                log.debug( "Message = {0}", message );
-
-                if ( message.getHost() == null || message.getCacheNames() == null || message.getCacheNames().isEmpty() )
-                {
-                    log.debug( "Ignoring invalid message: {0}", message );
-                }
-                else
-                {
-                    processMessage();
-                }
-            }
+            handleMessage(message);
         }
 
-        /**
-         * Process the incoming message.
-         */
-        @SuppressWarnings("synthetic-access")
-        private void processMessage()
-        {
-            final DiscoveredService discoveredService = new DiscoveredService();
-            discoveredService.setServiceAddress( message.getHost() );
-            discoveredService.setCacheNames( message.getCacheNames() );
-            discoveredService.setServicePort( message.getPort() );
-            discoveredService.setLastHearFromTime( System.currentTimeMillis() );
+    }
 
-            // if this is a request message, have the service handle it and
-            // return
-            if ( message.getMessageType() == BroadcastType.REQUEST )
+    /**
+     * Separate thread run when a command comes into the UDPDiscoveryReceiver.
+     */
+    private void handleMessage(UDPDiscoveryMessage message)
+    {
+        // consider comparing ports here instead.
+        if ( message.getRequesterId() == CacheInfo.listenerId )
+        {
+            log.debug( "Ignoring message sent from self" );
+        }
+        else
+        {
+            log.debug( "Process message sent from another" );
+            log.debug( "Message = {0}", message );
+
+            if ( message.getHost() == null || message.getCacheNames() == null || message.getCacheNames().isEmpty() )
             {
-                log.debug( "Message is a Request Broadcast, will have the service handle it." );
-                service.serviceRequestBroadcast();
-            }
-            else if ( message.getMessageType() == BroadcastType.REMOVE )
-            {
-                log.debug( "Removing service from set {0}", discoveredService );
-                service.removeDiscoveredService( discoveredService );
+                log.debug( "Ignoring invalid message: {0}", message );
             }
             else
             {
-                service.addOrUpdateService( discoveredService );
+                processMessage(message);
             }
         }
     }
 
+    /**
+     * Process the incoming message.
+     */
+    private void processMessage(UDPDiscoveryMessage message)
+    {
+        final DiscoveredService discoveredService = new DiscoveredService();
+        discoveredService.setServiceAddress( message.getHost() );
+        discoveredService.setCacheNames( message.getCacheNames() );
+        discoveredService.setServicePort( message.getPort() );
+        discoveredService.setLastHearFromTime( System.currentTimeMillis() );
+
+        // if this is a request message, have the service handle it and
+        // return
+        if ( message.getMessageType() == BroadcastType.REQUEST )
+        {
+            log.debug( "Message is a Request Broadcast, will have the service handle it." );
+            service.serviceRequestBroadcast();
+        }
+        else if ( message.getMessageType() == BroadcastType.REMOVE )
+        {
+            log.debug( "Removing service from set {0}", discoveredService );
+            service.removeDiscoveredService( discoveredService );
+        }
+        else
+        {
+            service.addOrUpdateService( discoveredService );
+        }
+    }
+
     /** Shuts down the socket. */
     @Override
     public void shutdown()