Make IElementSerializer configurable through all levels
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPCacheFactory.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPCacheFactory.java
index 69a0eca..fc08301 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPCacheFactory.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPCacheFactory.java
@@ -390,7 +390,9 @@
// get dereferenced, also we don't want one for every region.
discovery = UDPDiscoveryManager.getInstance().getService( lac.getUdpDiscoveryAddr(),
lac.getUdpDiscoveryPort(),
- lac.getTcpListenerPort(), cacheMgr);
+ lac.getTcpListenerPort(),
+ cacheMgr,
+ elementSerializer);
discovery.addParticipatingCacheName( lac.getCacheName() );
discovery.addDiscoveryListener( discoveryListener );
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryManager.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryManager.java
index 1d51838..f07f4de 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryManager.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryManager.java
@@ -23,9 +23,11 @@
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.jcs3.engine.behavior.ICompositeCacheManager;
+import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
import org.apache.commons.jcs3.engine.behavior.IProvideScheduler;
import org.apache.commons.jcs3.log.Log;
import org.apache.commons.jcs3.log.LogManager;
+import org.apache.commons.jcs3.utils.serialization.StandardSerializer;
/**
* This manages UDPDiscovery Services. We should end up with one service per Lateral Cache Manager
@@ -60,6 +62,7 @@
return INSTANCE;
}
+
/**
* Creates a service for the address and port if one doesn't exist already.
* <p>
@@ -71,11 +74,33 @@
* @param servicePort
* @param cacheMgr
* @return UDPDiscoveryService
+ * @deprecated Specify serializer implementation explicitly
*/
+ @Deprecated
public UDPDiscoveryService getService( final String discoveryAddress, final int discoveryPort, final int servicePort,
final ICompositeCacheManager cacheMgr )
{
- final String key = discoveryAddress + ":" + discoveryPort + ":" + servicePort;
+ return getService(discoveryAddress, discoveryPort, servicePort, cacheMgr, new StandardSerializer());
+ }
+
+ /**
+ * Creates a service for the address and port if one doesn't exist already.
+ * <p>
+ * We need to key this using the listener port too. TODO think of making one discovery service
+ * work for multiple types of clients.
+ * <p>
+ * @param discoveryAddress
+ * @param discoveryPort
+ * @param servicePort
+ * @param cacheMgr
+ * @param serializer
+ *
+ * @return UDPDiscoveryService
+ */
+ public UDPDiscoveryService getService( final String discoveryAddress, final int discoveryPort,
+ final int servicePort, final ICompositeCacheManager cacheMgr, final IElementSerializer serializer )
+ {
+ final String key = String.join(":", discoveryAddress, String.valueOf(discoveryPort), String.valueOf(servicePort));
final UDPDiscoveryService service = services.computeIfAbsent(key, k -> {
log.info( "Creating service for address:port:servicePort [{0}]", key );
@@ -85,7 +110,7 @@
attributes.setUdpDiscoveryPort( discoveryPort );
attributes.setServicePort( servicePort );
- final UDPDiscoveryService newService = new UDPDiscoveryService( attributes );
+ final UDPDiscoveryService newService = new UDPDiscoveryService(attributes, serializer);
// register for shutdown notification
cacheMgr.registerShutdownObserver( newService );
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryReceiver.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryReceiver.java
index e269301..60b872c 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryReceiver.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryReceiver.java
@@ -19,9 +19,7 @@
* under the License.
*/
-import java.io.ByteArrayInputStream;
import java.io.IOException;
-import java.io.ObjectInputStream;
import java.net.DatagramPacket;
import java.net.InetAddress;
import java.net.MulticastSocket;
@@ -32,7 +30,6 @@
import org.apache.commons.jcs3.engine.CacheInfo;
import org.apache.commons.jcs3.engine.behavior.IShutdownObserver;
-import org.apache.commons.jcs3.io.ObjectInputStreamClassLoaderAware;
import org.apache.commons.jcs3.log.Log;
import org.apache.commons.jcs3.log.LogManager;
import org.apache.commons.jcs3.utils.net.HostNameUtil;
@@ -47,9 +44,6 @@
/** The log factory */
private static final Log log = LogManager.getLog( UDPDiscoveryReceiver.class );
- /** buffer */
- private final byte[] mBuffer = new byte[65536];
-
/** The socket used for communication. */
private MulticastSocket mSocket;
@@ -159,7 +153,8 @@
public Object waitForMessage()
throws IOException
{
- final DatagramPacket packet = new DatagramPacket( mBuffer, mBuffer.length );
+ final byte[] mBuffer = new byte[65536];
+ final DatagramPacket packet = new DatagramPacket(mBuffer, mBuffer.length);
Object obj = null;
try
{
@@ -170,11 +165,7 @@
log.debug( "Received packet from address [{0}]",
() -> packet.getSocketAddress() );
- try (ByteArrayInputStream byteStream = new ByteArrayInputStream(mBuffer, 0, packet.getLength());
- ObjectInputStream objectStream = new ObjectInputStreamClassLoaderAware(byteStream, null))
- {
- obj = objectStream.readObject();
- }
+ obj = service.getSerializer().deSerialize(mBuffer, null);
if ( obj instanceof UDPDiscoveryMessage )
{
@@ -188,7 +179,7 @@
packet.getSocketAddress(), obj );
}
}
- catch ( final Exception e )
+ catch ( final IOException | ClassNotFoundException e )
{
log.error( "Error receiving multicast packet", e );
}
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoverySender.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoverySender.java
index b153401..b35be3c 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoverySender.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoverySender.java
@@ -27,6 +27,7 @@
import java.util.ArrayList;
import org.apache.commons.jcs3.engine.CacheInfo;
+import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
import org.apache.commons.jcs3.log.Log;
import org.apache.commons.jcs3.log.LogManager;
import org.apache.commons.jcs3.utils.discovery.UDPDiscoveryMessage.BroadcastType;
@@ -52,7 +53,7 @@
private final int multicastPort;
/** Used to serialize messages */
- private final StandardSerializer serializer = new StandardSerializer();
+ private final IElementSerializer serializer;
/**
* Constructor for the UDPDiscoverySender object
@@ -65,10 +66,31 @@
* @param port
* @param udpTTL the Datagram packet time-to-live
* @throws IOException
+ * @deprecated Specify serializer implementation explicitly
*/
+ @Deprecated
public UDPDiscoverySender( final String host, final int port, final int udpTTL )
throws IOException
{
+ this(host, port, udpTTL, new StandardSerializer());
+ }
+
+ /**
+ * Constructor for the UDPDiscoverySender object
+ * <p>
+ * This sender can be used to send multiple messages.
+ * <p>
+ * When you are done sending, you should destroy the socket sender.
+ * <p>
+ * @param host
+ * @param port
+ * @param udpTTL the Datagram packet time-to-live
+ * @param serializer the Serializer to use when sending messages
+ * @throws IOException
+ */
+ public UDPDiscoverySender( final String host, final int port, final int udpTTL, IElementSerializer serializer)
+ throws IOException
+ {
try
{
log.debug( "Constructing socket for sender on port [{0}]", port );
@@ -89,6 +111,7 @@
}
this.multicastPort = port;
+ this.serializer = serializer;
}
/**
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryService.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryService.java
index bbe4b4e..8d08463 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryService.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryService.java
@@ -32,12 +32,14 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
import org.apache.commons.jcs3.engine.behavior.IRequireScheduler;
import org.apache.commons.jcs3.engine.behavior.IShutdownObserver;
import org.apache.commons.jcs3.log.Log;
import org.apache.commons.jcs3.log.LogManager;
import org.apache.commons.jcs3.utils.discovery.behavior.IDiscoveryListener;
import org.apache.commons.jcs3.utils.net.HostNameUtil;
+import org.apache.commons.jcs3.utils.serialization.StandardSerializer;
/**
* This service creates a listener that can create lateral caches and add them to the no wait list.
@@ -63,8 +65,11 @@
/** attributes */
private UDPDiscoveryAttributes udpDiscoveryAttributes;
+ /** Used to serialize messages */
+ private final IElementSerializer serializer;
+
/** is this shut down? */
- private AtomicBoolean shutdown = new AtomicBoolean(false);
+ private final AtomicBoolean shutdown = new AtomicBoolean(false);
/** This is a set of services that have been discovered. */
private final ConcurrentMap<Integer, DiscoveredService> discoveredServices = new ConcurrentHashMap<>();
@@ -82,9 +87,24 @@
private ScheduledFuture<?> cleanupTaskFuture = null;
/**
- * @param attributes
+ * Constructor
+ *
+ * @param attributes settings of the service
+ * @deprecated Specify serializer implementation explicitly
*/
- public UDPDiscoveryService( final UDPDiscoveryAttributes attributes)
+ @Deprecated
+ public UDPDiscoveryService(final UDPDiscoveryAttributes attributes)
+ {
+ this(attributes, new StandardSerializer());
+ }
+
+ /**
+ * Constructor
+ *
+ * @param attributes settings of service
+ * @param serializer the serializer to use to send and receive messages
+ */
+ public UDPDiscoveryService(final UDPDiscoveryAttributes attributes, IElementSerializer serializer)
{
udpDiscoveryAttributes = attributes.clone();
@@ -114,6 +134,8 @@
getUdpDiscoveryAttributes().getUdpDiscoveryPort(), e );
}
+ this.serializer = serializer;
+
// initiate sender broadcast
initiateBroadcast();
}
@@ -177,7 +199,8 @@
try (UDPDiscoverySender sender = new UDPDiscoverySender(
getUdpDiscoveryAttributes().getUdpDiscoveryAddr(),
getUdpDiscoveryAttributes().getUdpDiscoveryPort(),
- getUdpDiscoveryAttributes().getUdpTTL()))
+ getUdpDiscoveryAttributes().getUdpTTL(),
+ getSerializer()))
{
sender.requestBroadcast();
@@ -202,7 +225,8 @@
try (UDPDiscoverySender sender = new UDPDiscoverySender(
getUdpDiscoveryAttributes().getUdpDiscoveryAddr(),
getUdpDiscoveryAttributes().getUdpDiscoveryPort(),
- getUdpDiscoveryAttributes().getUdpTTL()))
+ getUdpDiscoveryAttributes().getUdpTTL(),
+ getSerializer()))
{
sender.passiveBroadcast(
getUdpDiscoveryAttributes().getServiceAddress(),
@@ -230,7 +254,8 @@
try (UDPDiscoverySender sender = new UDPDiscoverySender(
getUdpDiscoveryAttributes().getUdpDiscoveryAddr(),
getUdpDiscoveryAttributes().getUdpDiscoveryPort(),
- getUdpDiscoveryAttributes().getUdpTTL()))
+ getUdpDiscoveryAttributes().getUdpTTL(),
+ getSerializer()))
{
sender.removeBroadcast(
getUdpDiscoveryAttributes().getServiceAddress(),
@@ -335,6 +360,16 @@
}
/**
+ * Return the serializer implementation
+ *
+ * @return the serializer
+ */
+ public IElementSerializer getSerializer()
+ {
+ return serializer;
+ }
+
+ /**
* Start necessary receiver thread
*/
public void startup()