Replace several Runnable classes with Lambdas, deprecate orig. classes
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/remote/server/RegistryKeepAliveRunner.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/remote/server/RegistryKeepAliveRunner.java
index c184f3b..f1687b7 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/remote/server/RegistryKeepAliveRunner.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/remote/server/RegistryKeepAliveRunner.java
@@ -32,7 +32,9 @@
 /**
  * This class tries to keep the registry alive. If if is able to create a registry, it will also
  * rebind the remote cache server.
+ * @deprecated Functionality moved to RemoteCacheServerFactory
  */
+@Deprecated
 public class RegistryKeepAliveRunner
     implements Runnable
 {
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPCleanupRunner.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPCleanupRunner.java
index 647dc2d..ea72028 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPCleanupRunner.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPCleanupRunner.java
@@ -28,7 +28,9 @@
  * If they exceed the configurable limit, it removes them from the set.
  * <p>
  * @author Aaron Smuts
+ * @deprecated Functionality moved to UDPDiscoveryService
  */
+@Deprecated
 public class UDPCleanupRunner
     implements Runnable
 {
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryReceiver.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryReceiver.java
index 5e71d63..2e2baba 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryReceiver.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryReceiver.java
@@ -27,6 +27,7 @@
 import java.net.MulticastSocket;
 import java.net.NetworkInterface;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.jcs3.engine.CacheInfo;
@@ -34,7 +35,6 @@
 import org.apache.commons.jcs3.io.ObjectInputStreamClassLoaderAware;
 import org.apache.commons.jcs3.log.Log;
 import org.apache.commons.jcs3.log.LogManager;
-import org.apache.commons.jcs3.utils.discovery.UDPDiscoveryMessage.BroadcastType;
 import org.apache.commons.jcs3.utils.net.HostNameUtil;
 import org.apache.commons.jcs3.utils.threadpool.PoolConfiguration;
 import org.apache.commons.jcs3.utils.threadpool.PoolConfiguration.WhenBlockedPolicy;
@@ -72,7 +72,7 @@
     private final InetAddress multicastAddress;
 
     /** Is it shutdown. */
-    private boolean shutdown;
+    private AtomicBoolean shutdown = new AtomicBoolean(false);
 
     /**
      * Constructor for the LateralUDPReceiver object.
@@ -202,7 +202,7 @@
     {
         try
         {
-            while ( !shutdown )
+            while (!shutdown.get())
             {
                 final Object obj = waitForMessage();
 
@@ -329,21 +329,22 @@
         discoveredService.setServicePort( message.getPort() );
         discoveredService.setLastHearFromTime( System.currentTimeMillis() );
 
-        // if this is a request message, have the service handle it and
-        // return
-        if ( message.getMessageType() == BroadcastType.REQUEST )
+        switch (message.getMessageType())
         {
-            log.debug( "Message is a Request Broadcast, will have the service handle it." );
-            service.serviceRequestBroadcast();
-        }
-        else if ( message.getMessageType() == BroadcastType.REMOVE )
-        {
-            log.debug( "Removing service from set {0}", discoveredService );
-            service.removeDiscoveredService( discoveredService );
-        }
-        else
-        {
-            service.addOrUpdateService( discoveredService );
+            case REMOVE:
+                log.debug( "Removing service from set {0}", discoveredService );
+                service.removeDiscoveredService( discoveredService );
+                break;
+            case REQUEST:
+                // if this is a request message, have the service handle it and
+                // return
+                log.debug( "Message is a Request Broadcast, will have the service handle it." );
+                service.serviceRequestBroadcast();
+                break;
+            case PASSIVE:
+            default:
+                service.addOrUpdateService( discoveredService );
+                break;
         }
     }
 
@@ -351,11 +352,10 @@
     @Override
     public void shutdown()
     {
-        if (!shutdown)
+        if (shutdown.compareAndSet(false, true))
         {
             try
             {
-                shutdown = true;
                 mSocket.leaveGroup( multicastAddress );
                 mSocket.close();
                 pooledExecutor.shutdownNow();
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoverySender.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoverySender.java
index 1f431ce..b153401 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoverySender.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoverySender.java
@@ -125,23 +125,15 @@
         log.debug( "sending UDPDiscoveryMessage, address [{0}], port [{1}], "
                 + "message = {2}", multicastAddress, multicastPort, message );
 
-        try
-        {
-            final byte[] bytes = serializer.serialize( message );
+        final byte[] bytes = serializer.serialize( message );
 
-            // put the byte array in a packet
-            final DatagramPacket packet = new DatagramPacket( bytes, bytes.length, multicastAddress, multicastPort );
+        // put the byte array in a packet
+        final DatagramPacket packet = new DatagramPacket( bytes, bytes.length, multicastAddress, multicastPort );
 
-            log.debug( "Sending DatagramPacket. bytes.length [{0}] to {1}:{2}",
-                    bytes.length, multicastAddress, multicastPort );
+        log.debug( "Sending DatagramPacket with {0} bytes to {1}:{2}",
+                bytes.length, multicastAddress, multicastPort );
 
-            localSocket.send( packet );
-        }
-        catch ( final IOException e )
-        {
-            log.error( "Error sending message", e );
-            throw e;
-        }
+        localSocket.send( packet );
     }
 
     /**
@@ -244,8 +236,9 @@
  * <p>
  * @author asmuts
  * @created January 15, 2002
+ * @deprecated No longer used
  */
-
+@Deprecated
 class MyByteArrayOutputStream
     extends ByteArrayOutputStream
 {
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoverySenderThread.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoverySenderThread.java
index c7ec781..1204e69 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoverySenderThread.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoverySenderThread.java
@@ -27,7 +27,9 @@
 
 /**
  * Used to periodically broadcast our location to other caches that might be listening.
+ * @deprecated Functionality moved to UDPDiscoveryService
  */
+@Deprecated
 public class UDPDiscoverySenderThread
     implements Runnable
 {
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryService.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryService.java
index 3f324aa..caa64d0 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryService.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryService.java
@@ -26,7 +26,9 @@
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.jcs3.engine.behavior.IRequireScheduler;
 import org.apache.commons.jcs3.engine.behavior.IShutdownObserver;
@@ -56,14 +58,11 @@
     /** the runnable that the receiver thread runs */
     private UDPDiscoveryReceiver receiver;
 
-    /** the runnable that sends messages via the clock daemon */
-    private final UDPDiscoverySenderThread sender;
-
     /** attributes */
     private UDPDiscoveryAttributes udpDiscoveryAttributes;
 
     /** is this shut down? */
-    private boolean shutdown;
+    private AtomicBoolean shutdown = new AtomicBoolean(false);
 
     /** This is a set of services that have been discovered. */
     private final Set<DiscoveredService> discoveredServices = new CopyOnWriteArraySet<>();
@@ -74,6 +73,12 @@
     /** Set of listeners. */
     private final Set<IDiscoveryListener> discoveryListeners = new CopyOnWriteArraySet<>();
 
+    /** Handle to cancel the scheduled broadcast task */
+    private ScheduledFuture<?> broadcastTaskFuture = null;
+
+    /** Handle to cancel the scheduled cleanup task */
+    private ScheduledFuture<?> cleanupTaskFuture = null;
+
     /**
      * @param attributes
      */
@@ -107,8 +112,8 @@
                     getUdpDiscoveryAttributes().getUdpDiscoveryPort(), e );
         }
 
-        // create a sender thread
-        sender = new UDPDiscoverySenderThread( getUdpDiscoveryAttributes(), getCacheNames() );
+        // initiate sender broadcast
+        initiateBroadcast();
     }
 
     /**
@@ -117,17 +122,69 @@
     @Override
     public void setScheduledExecutorService(final ScheduledExecutorService scheduledExecutor)
     {
-        if (sender != null)
-        {
-            scheduledExecutor.scheduleAtFixedRate(sender, 0, 15, TimeUnit.SECONDS);
-        }
+        this.broadcastTaskFuture = scheduledExecutor.scheduleAtFixedRate(
+                () -> serviceRequestBroadcast(), 0, 15, TimeUnit.SECONDS);
 
         /** removes things that have been idle for too long */
-        final UDPCleanupRunner cleanup = new UDPCleanupRunner( this );
         // I'm going to use this as both, but it could happen
         // that something could hang around twice the time using this as the
         // delay and the idle time.
-        scheduledExecutor.scheduleAtFixedRate(cleanup, 0, getUdpDiscoveryAttributes().getMaxIdleTimeSec(), TimeUnit.SECONDS);
+        this.cleanupTaskFuture = scheduledExecutor.scheduleAtFixedRate(
+                () -> cleanup(), 0,
+                getUdpDiscoveryAttributes().getMaxIdleTimeSec(), TimeUnit.SECONDS);
+    }
+
+    /**
+     * This goes through the list of services and removes those that we haven't heard from in longer
+     * than the max idle time.
+     */
+    protected void cleanup()
+    {
+        final long now = System.currentTimeMillis();
+
+        // the listeners need to be notified.
+        getDiscoveredServices().stream()
+            .filter(service -> {
+                if (now - service.getLastHearFromTime() > getUdpDiscoveryAttributes().getMaxIdleTimeSec() * 1000)
+                {
+                    log.info( "Removing service, since we haven't heard from it in "
+                            + "{0} seconds. service = {1}",
+                            getUdpDiscoveryAttributes().getMaxIdleTimeSec(), service );
+                    return true;
+                }
+
+                return false;
+            })
+            // remove the bad ones
+            // call this so the listeners get notified
+            .forEach(service -> removeDiscoveredService(service));
+    }
+
+    /**
+     * Initial request that the other caches let it know their addresses.
+     */
+    public void initiateBroadcast()
+    {
+        log.debug( "Creating sender thread for discoveryAddress = [{0}] and "
+                + "discoveryPort = [{1}] myHostName = [{2}] and port = [{3}]",
+                () -> getUdpDiscoveryAttributes().getUdpDiscoveryAddr(),
+                () -> getUdpDiscoveryAttributes().getUdpDiscoveryPort(),
+                () -> getUdpDiscoveryAttributes().getServiceAddress(),
+                () -> getUdpDiscoveryAttributes().getServicePort() );
+
+        try (UDPDiscoverySender sender = new UDPDiscoverySender(
+                getUdpDiscoveryAttributes().getUdpDiscoveryAddr(),
+                getUdpDiscoveryAttributes().getUdpDiscoveryPort(),
+                getUdpDiscoveryAttributes().getUdpTTL()))
+        {
+            sender.requestBroadcast();
+
+            log.debug( "Sent a request broadcast to the group" );
+        }
+        catch ( final IOException e )
+        {
+            log.error( "Problem sending a Request Broadcast", e );
+        }
     }
 
     /**
@@ -145,11 +202,10 @@
                 getUdpDiscoveryAttributes().getUdpDiscoveryPort(),
                 getUdpDiscoveryAttributes().getUdpTTL()))
         {
-            sender.passiveBroadcast( getUdpDiscoveryAttributes().getServiceAddress(), getUdpDiscoveryAttributes()
-                .getServicePort(), this.getCacheNames() );
-
-            // todo we should consider sending a request broadcast every so
-            // often.
+            sender.passiveBroadcast(
+                    getUdpDiscoveryAttributes().getServiceAddress(),
+                    getUdpDiscoveryAttributes().getServicePort(),
+                    this.getCacheNames() );
 
             log.debug( "Called sender to issue a passive broadcast" );
         }
@@ -163,6 +219,31 @@
     }
 
     /**
+     * Issues a remove broadcast to the others.
+     */
+    protected void shutdownBroadcast()
+    {
+        // create this connection each time.
+        // more robust
+        try (UDPDiscoverySender sender = new UDPDiscoverySender(
+                getUdpDiscoveryAttributes().getUdpDiscoveryAddr(),
+                getUdpDiscoveryAttributes().getUdpDiscoveryPort(),
+                getUdpDiscoveryAttributes().getUdpTTL()))
+        {
+            sender.removeBroadcast(
+                    getUdpDiscoveryAttributes().getServiceAddress(),
+                    getUdpDiscoveryAttributes().getServicePort(),
+                    this.getCacheNames() );
+
+            log.debug( "Called sender to issue a remove broadcast in shutdown." );
+        }
+        catch ( final IOException e )
+        {
+            log.error( "Problem calling the UDP Discovery Sender", e );
+        }
+    }
+
+    /**
      * Adds a region to the list that is participating in discovery.
      * <p>
      * @param cacheName
@@ -170,7 +251,6 @@
     public void addParticipatingCacheName( final String cacheName )
     {
         cacheNames.add( cacheName );
-        sender.setCacheNames( getCacheNames() );
     }
 
     /**
@@ -295,9 +375,17 @@
     @Override
     public void shutdown()
     {
-        if ( !shutdown )
+        if (shutdown.compareAndSet(false, true))
         {
-            shutdown = true;
+            // Stop the scheduled tasks
+            if (broadcastTaskFuture != null)
+            {
+                broadcastTaskFuture.cancel(false);
+            }
+            if (cleanupTaskFuture != null)
+            {
+                cleanupTaskFuture.cancel(false);
+            }
 
             // no good way to do this right now.
             if (receiver != null)
@@ -307,13 +395,10 @@
                 udpReceiverThread.interrupt();
             }
 
-            if (sender != null)
-            {
-                log.info( "Shutting down UDP discovery service sender." );
-                // also call the shutdown on the sender thread itself, which
-                // will result in a remove command.
-                sender.shutdown();
-            }
+            log.info( "Shutting down UDP discovery service sender." );
+            // also call the shutdown on the sender thread itself, which
+            // will result in a remove command.
+            shutdownBroadcast();
         }
         else
         {
diff --git a/commons-jcs-core/src/test/java/org/apache/commons/jcs3/auxiliary/remote/server/RegistryKeepAliveRunnerUnitTest.java b/commons-jcs-core/src/test/java/org/apache/commons/jcs3/auxiliary/remote/server/RegistryKeepAliveRunnerUnitTest.java
deleted file mode 100644
index d47c43b..0000000
--- a/commons-jcs-core/src/test/java/org/apache/commons/jcs3/auxiliary/remote/server/RegistryKeepAliveRunnerUnitTest.java
+++ /dev/null
@@ -1,49 +0,0 @@
-package org.apache.commons.jcs3.auxiliary.remote.server;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import junit.framework.TestCase;
-import org.apache.commons.jcs3.auxiliary.MockCacheEventLogger;
-
-/** Unit tests for the registry keep alive runner. */
-public class RegistryKeepAliveRunnerUnitTest
-    extends TestCase
-{
-    /** Verify that we get the appropriate event log */
-    public void testCheckAndRestoreIfNeeded_failure()
-    {
-        // SETUP
-        final String host = "localhost";
-        final int port = 1234;
-        final String service = "doesn'texist";
-        final MockCacheEventLogger cacheEventLogger = new MockCacheEventLogger();
-
-        final RegistryKeepAliveRunner runner = new RegistryKeepAliveRunner( host, port, service );
-        runner.setCacheEventLogger( cacheEventLogger );
-
-        // DO WORK
-        runner.checkAndRestoreIfNeeded();
-
-        // VERIFY
-        // 1 for the lookup, one for the rebind since the server isn't created yet
-        assertEquals( "error tally", 2, cacheEventLogger.errorEventCalls );
-        //System.out.println( cacheEventLogger.errorMessages );
-    }
-}