Replace several Runnable classes with Lambdas, deprecate orig. classes
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/remote/server/RegistryKeepAliveRunner.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/remote/server/RegistryKeepAliveRunner.java
index c184f3b..f1687b7 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/remote/server/RegistryKeepAliveRunner.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/remote/server/RegistryKeepAliveRunner.java
@@ -32,7 +32,9 @@
/**
* This class tries to keep the registry alive. If if is able to create a registry, it will also
* rebind the remote cache server.
+ * @deprecated Functionality moved to RemoteCacheServerFactory
*/
+@Deprecated
public class RegistryKeepAliveRunner
implements Runnable
{
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPCleanupRunner.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPCleanupRunner.java
index 647dc2d..ea72028 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPCleanupRunner.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPCleanupRunner.java
@@ -28,7 +28,9 @@
* If they exceed the configurable limit, it removes them from the set.
* <p>
* @author Aaron Smuts
+ * @deprecated Functionality moved to UDPDiscoveryService
*/
+@Deprecated
public class UDPCleanupRunner
implements Runnable
{
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryReceiver.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryReceiver.java
index 5e71d63..2e2baba 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryReceiver.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryReceiver.java
@@ -27,6 +27,7 @@
import java.net.MulticastSocket;
import java.net.NetworkInterface;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.jcs3.engine.CacheInfo;
@@ -34,7 +35,6 @@
import org.apache.commons.jcs3.io.ObjectInputStreamClassLoaderAware;
import org.apache.commons.jcs3.log.Log;
import org.apache.commons.jcs3.log.LogManager;
-import org.apache.commons.jcs3.utils.discovery.UDPDiscoveryMessage.BroadcastType;
import org.apache.commons.jcs3.utils.net.HostNameUtil;
import org.apache.commons.jcs3.utils.threadpool.PoolConfiguration;
import org.apache.commons.jcs3.utils.threadpool.PoolConfiguration.WhenBlockedPolicy;
@@ -72,7 +72,7 @@
private final InetAddress multicastAddress;
/** Is it shutdown. */
- private boolean shutdown;
+ private AtomicBoolean shutdown = new AtomicBoolean(false);
/**
* Constructor for the LateralUDPReceiver object.
@@ -202,7 +202,7 @@
{
try
{
- while ( !shutdown )
+ while (!shutdown.get())
{
final Object obj = waitForMessage();
@@ -329,21 +329,22 @@
discoveredService.setServicePort( message.getPort() );
discoveredService.setLastHearFromTime( System.currentTimeMillis() );
- // if this is a request message, have the service handle it and
- // return
- if ( message.getMessageType() == BroadcastType.REQUEST )
+ switch (message.getMessageType())
{
- log.debug( "Message is a Request Broadcast, will have the service handle it." );
- service.serviceRequestBroadcast();
- }
- else if ( message.getMessageType() == BroadcastType.REMOVE )
- {
- log.debug( "Removing service from set {0}", discoveredService );
- service.removeDiscoveredService( discoveredService );
- }
- else
- {
- service.addOrUpdateService( discoveredService );
+ case REMOVE:
+ log.debug( "Removing service from set {0}", discoveredService );
+ service.removeDiscoveredService( discoveredService );
+ break;
+ case REQUEST:
+ // if this is a request message, have the service handle it and
+ // return
+ log.debug( "Message is a Request Broadcast, will have the service handle it." );
+ service.serviceRequestBroadcast();
+ break;
+ case PASSIVE:
+ default:
+ service.addOrUpdateService( discoveredService );
+ break;
}
}
@@ -351,11 +352,10 @@
@Override
public void shutdown()
{
- if (!shutdown)
+ if (shutdown.compareAndSet(false, true))
{
try
{
- shutdown = true;
mSocket.leaveGroup( multicastAddress );
mSocket.close();
pooledExecutor.shutdownNow();
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoverySender.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoverySender.java
index 1f431ce..b153401 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoverySender.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoverySender.java
@@ -125,23 +125,15 @@
log.debug( "sending UDPDiscoveryMessage, address [{0}], port [{1}], "
+ "message = {2}", multicastAddress, multicastPort, message );
- try
- {
- final byte[] bytes = serializer.serialize( message );
+ final byte[] bytes = serializer.serialize( message );
- // put the byte array in a packet
- final DatagramPacket packet = new DatagramPacket( bytes, bytes.length, multicastAddress, multicastPort );
+ // put the byte array in a packet
+ final DatagramPacket packet = new DatagramPacket( bytes, bytes.length, multicastAddress, multicastPort );
- log.debug( "Sending DatagramPacket. bytes.length [{0}] to {1}:{2}",
- bytes.length, multicastAddress, multicastPort );
+ log.debug( "Sending DatagramPacket with {0} bytes to {1}:{2}",
+ bytes.length, multicastAddress, multicastPort );
- localSocket.send( packet );
- }
- catch ( final IOException e )
- {
- log.error( "Error sending message", e );
- throw e;
- }
+ localSocket.send( packet );
}
/**
@@ -244,8 +236,9 @@
* <p>
* @author asmuts
* @created January 15, 2002
+ * @deprecated No longer used
*/
-
+@Deprecated
class MyByteArrayOutputStream
extends ByteArrayOutputStream
{
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoverySenderThread.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoverySenderThread.java
index c7ec781..1204e69 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoverySenderThread.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoverySenderThread.java
@@ -27,7 +27,9 @@
/**
* Used to periodically broadcast our location to other caches that might be listening.
+ * @deprecated Functionality moved to UDPDiscoveryService
*/
+@Deprecated
public class UDPDiscoverySenderThread
implements Runnable
{
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryService.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryService.java
index 3f324aa..caa64d0 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryService.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryService.java
@@ -26,7 +26,9 @@
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.jcs3.engine.behavior.IRequireScheduler;
import org.apache.commons.jcs3.engine.behavior.IShutdownObserver;
@@ -56,14 +58,11 @@
/** the runnable that the receiver thread runs */
private UDPDiscoveryReceiver receiver;
- /** the runnable that sends messages via the clock daemon */
- private final UDPDiscoverySenderThread sender;
-
/** attributes */
private UDPDiscoveryAttributes udpDiscoveryAttributes;
/** is this shut down? */
- private boolean shutdown;
+ private AtomicBoolean shutdown = new AtomicBoolean(false);
/** This is a set of services that have been discovered. */
private final Set<DiscoveredService> discoveredServices = new CopyOnWriteArraySet<>();
@@ -74,6 +73,12 @@
/** Set of listeners. */
private final Set<IDiscoveryListener> discoveryListeners = new CopyOnWriteArraySet<>();
+ /** Handle to cancel the scheduled broadcast task */
+ private ScheduledFuture<?> broadcastTaskFuture = null;
+
+ /** Handle to cancel the scheduled cleanup task */
+ private ScheduledFuture<?> cleanupTaskFuture = null;
+
/**
* @param attributes
*/
@@ -107,8 +112,8 @@
getUdpDiscoveryAttributes().getUdpDiscoveryPort(), e );
}
- // create a sender thread
- sender = new UDPDiscoverySenderThread( getUdpDiscoveryAttributes(), getCacheNames() );
+ // initiate sender broadcast
+ initiateBroadcast();
}
/**
@@ -117,17 +122,69 @@
@Override
public void setScheduledExecutorService(final ScheduledExecutorService scheduledExecutor)
{
- if (sender != null)
- {
- scheduledExecutor.scheduleAtFixedRate(sender, 0, 15, TimeUnit.SECONDS);
- }
+ this.broadcastTaskFuture = scheduledExecutor.scheduleAtFixedRate(
+ () -> serviceRequestBroadcast(), 0, 15, TimeUnit.SECONDS);
/** removes things that have been idle for too long */
- final UDPCleanupRunner cleanup = new UDPCleanupRunner( this );
// I'm going to use this as both, but it could happen
// that something could hang around twice the time using this as the
// delay and the idle time.
- scheduledExecutor.scheduleAtFixedRate(cleanup, 0, getUdpDiscoveryAttributes().getMaxIdleTimeSec(), TimeUnit.SECONDS);
+ this.cleanupTaskFuture = scheduledExecutor.scheduleAtFixedRate(
+ () -> cleanup(), 0,
+ getUdpDiscoveryAttributes().getMaxIdleTimeSec(), TimeUnit.SECONDS);
+ }
+
+ /**
+ * This goes through the list of services and removes those that we haven't heard from in longer
+ * than the max idle time.
+ */
+ protected void cleanup()
+ {
+ final long now = System.currentTimeMillis();
+
+ // the listeners need to be notified.
+ getDiscoveredServices().stream()
+ .filter(service -> {
+ if (now - service.getLastHearFromTime() > getUdpDiscoveryAttributes().getMaxIdleTimeSec() * 1000)
+ {
+ log.info( "Removing service, since we haven't heard from it in "
+ + "{0} seconds. service = {1}",
+ getUdpDiscoveryAttributes().getMaxIdleTimeSec(), service );
+ return true;
+ }
+
+ return false;
+ })
+ // remove the bad ones
+ // call this so the listeners get notified
+ .forEach(service -> removeDiscoveredService(service));
+ }
+
+ /**
+ * Initial request that the other caches let it know their addresses.
+ */
+ public void initiateBroadcast()
+ {
+ log.debug( "Creating sender thread for discoveryAddress = [{0}] and "
+ + "discoveryPort = [{1}] myHostName = [{2}] and port = [{3}]",
+ () -> getUdpDiscoveryAttributes().getUdpDiscoveryAddr(),
+ () -> getUdpDiscoveryAttributes().getUdpDiscoveryPort(),
+ () -> getUdpDiscoveryAttributes().getServiceAddress(),
+ () -> getUdpDiscoveryAttributes().getServicePort() );
+
+ try (UDPDiscoverySender sender = new UDPDiscoverySender(
+ getUdpDiscoveryAttributes().getUdpDiscoveryAddr(),
+ getUdpDiscoveryAttributes().getUdpDiscoveryPort(),
+ getUdpDiscoveryAttributes().getUdpTTL()))
+ {
+ sender.requestBroadcast();
+
+ log.debug( "Sent a request broadcast to the group" );
+ }
+ catch ( final IOException e )
+ {
+ log.error( "Problem sending a Request Broadcast", e );
+ }
}
/**
@@ -145,11 +202,10 @@
getUdpDiscoveryAttributes().getUdpDiscoveryPort(),
getUdpDiscoveryAttributes().getUdpTTL()))
{
- sender.passiveBroadcast( getUdpDiscoveryAttributes().getServiceAddress(), getUdpDiscoveryAttributes()
- .getServicePort(), this.getCacheNames() );
-
- // todo we should consider sending a request broadcast every so
- // often.
+ sender.passiveBroadcast(
+ getUdpDiscoveryAttributes().getServiceAddress(),
+ getUdpDiscoveryAttributes().getServicePort(),
+ this.getCacheNames() );
log.debug( "Called sender to issue a passive broadcast" );
}
@@ -163,6 +219,31 @@
}
/**
+ * Issues a remove broadcast to the others.
+ */
+ protected void shutdownBroadcast()
+ {
+ // create this connection each time.
+ // more robust
+ try (UDPDiscoverySender sender = new UDPDiscoverySender(
+ getUdpDiscoveryAttributes().getUdpDiscoveryAddr(),
+ getUdpDiscoveryAttributes().getUdpDiscoveryPort(),
+ getUdpDiscoveryAttributes().getUdpTTL()))
+ {
+ sender.removeBroadcast(
+ getUdpDiscoveryAttributes().getServiceAddress(),
+ getUdpDiscoveryAttributes().getServicePort(),
+ this.getCacheNames() );
+
+ log.debug( "Called sender to issue a remove broadcast in shutdown." );
+ }
+ catch ( final IOException e )
+ {
+ log.error( "Problem calling the UDP Discovery Sender", e );
+ }
+ }
+
+ /**
* Adds a region to the list that is participating in discovery.
* <p>
* @param cacheName
@@ -170,7 +251,6 @@
public void addParticipatingCacheName( final String cacheName )
{
cacheNames.add( cacheName );
- sender.setCacheNames( getCacheNames() );
}
/**
@@ -295,9 +375,17 @@
@Override
public void shutdown()
{
- if ( !shutdown )
+ if (shutdown.compareAndSet(false, true))
{
- shutdown = true;
+ // Stop the scheduled tasks
+ if (broadcastTaskFuture != null)
+ {
+ broadcastTaskFuture.cancel(false);
+ }
+ if (cleanupTaskFuture != null)
+ {
+ cleanupTaskFuture.cancel(false);
+ }
// no good way to do this right now.
if (receiver != null)
@@ -307,13 +395,10 @@
udpReceiverThread.interrupt();
}
- if (sender != null)
- {
- log.info( "Shutting down UDP discovery service sender." );
- // also call the shutdown on the sender thread itself, which
- // will result in a remove command.
- sender.shutdown();
- }
+ log.info( "Shutting down UDP discovery service sender." );
+ // also call the shutdown on the sender thread itself, which
+ // will result in a remove command.
+ shutdownBroadcast();
}
else
{
diff --git a/commons-jcs-core/src/test/java/org/apache/commons/jcs3/auxiliary/remote/server/RegistryKeepAliveRunnerUnitTest.java b/commons-jcs-core/src/test/java/org/apache/commons/jcs3/auxiliary/remote/server/RegistryKeepAliveRunnerUnitTest.java
deleted file mode 100644
index d47c43b..0000000
--- a/commons-jcs-core/src/test/java/org/apache/commons/jcs3/auxiliary/remote/server/RegistryKeepAliveRunnerUnitTest.java
+++ /dev/null
@@ -1,49 +0,0 @@
-package org.apache.commons.jcs3.auxiliary.remote.server;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import junit.framework.TestCase;
-import org.apache.commons.jcs3.auxiliary.MockCacheEventLogger;
-
-/** Unit tests for the registry keep alive runner. */
-public class RegistryKeepAliveRunnerUnitTest
- extends TestCase
-{
- /** Verify that we get the appropriate event log */
- public void testCheckAndRestoreIfNeeded_failure()
- {
- // SETUP
- final String host = "localhost";
- final int port = 1234;
- final String service = "doesn'texist";
- final MockCacheEventLogger cacheEventLogger = new MockCacheEventLogger();
-
- final RegistryKeepAliveRunner runner = new RegistryKeepAliveRunner( host, port, service );
- runner.setCacheEventLogger( cacheEventLogger );
-
- // DO WORK
- runner.checkAndRestoreIfNeeded();
-
- // VERIFY
- // 1 for the lookup, one for the rebind since the server isn't created yet
- assertEquals( "error tally", 2, cacheEventLogger.errorEventCalls );
- //System.out.println( cacheEventLogger.errorMessages );
- }
-}