Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/commons-jcs.git
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/disk/jdbc/JDBCDiskCache.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/disk/jdbc/JDBCDiskCache.java
index b425a61..4c8ede0 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/disk/jdbc/JDBCDiskCache.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/disk/jdbc/JDBCDiskCache.java
@@ -37,6 +37,7 @@
 import org.apache.commons.jcs3.auxiliary.AuxiliaryCacheAttributes;
 import org.apache.commons.jcs3.auxiliary.disk.AbstractDiskCache;
 import org.apache.commons.jcs3.auxiliary.disk.jdbc.dsfactory.DataSourceFactory;
+import org.apache.commons.jcs3.engine.behavior.ICache;
 import org.apache.commons.jcs3.engine.behavior.ICacheElement;
 import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
 import org.apache.commons.jcs3.engine.logging.behavior.ICacheEvent;
@@ -99,9 +100,6 @@
     /** # of times getMatching was called */
     private final AtomicInteger getMatchingCount = new AtomicInteger(0);
 
-    /** if count % interval == 0 then log */
-    private static final int LOG_INTERVAL = 100;
-
     /** db connection pool */
     private final DataSourceFactory dsFactory;
 
@@ -150,33 +148,20 @@
         {
             log.debug( "Putting [{0}] on disk.",  () -> ce.getKey());
 
-            final byte[] element;
-
             try
             {
-                element = getElementSerializer().serialize( ce );
+                final byte[] element = getElementSerializer().serialize( ce );
+                insertOrUpdate( ce, con, element );
             }
             catch ( final IOException e )
             {
                 log.error( "Could not serialize element", e );
-                return;
             }
-
-            insertOrUpdate( ce, con, element );
         }
         catch ( final SQLException e )
         {
             log.error( "Problem getting connection.", e );
         }
-
-        if ( log.isInfoEnabled() )
-        {
-            if ( updateCount.get() % LOG_INTERVAL == 0 )
-            {
-                // TODO make a log stats method
-                log.info( "Update Count [{0}]", updateCount);
-            }
-        }
     }
 
     /**
@@ -222,25 +207,19 @@
     private boolean insertRow( final ICacheElement<K, V> ce, final Connection con, final byte[] element )
     {
         boolean exists = false;
-        final String sqlI = "insert into "
-                + getJdbcDiskCacheAttributes().getTableName()
-                + " (CACHE_KEY, REGION, ELEMENT, MAX_LIFE_SECONDS, IS_ETERNAL, CREATE_TIME, UPDATE_TIME_SECONDS, SYSTEM_EXPIRE_TIME_SECONDS) "
-                + " values (?, ?, ?, ?, ?, ?, ?, ?)";
+        final String sqlI = String.format("insert into %s"
+                + " (CACHE_KEY, REGION, ELEMENT, MAX_LIFE_SECONDS, IS_ETERNAL, CREATE_TIME, UPDATE_TIME_SECONDS,"
+                + " SYSTEM_EXPIRE_TIME_SECONDS) "
+                + " values (?, ?, ?, ?, ?, ?, ?, ?)", getJdbcDiskCacheAttributes().getTableName());
 
         try (PreparedStatement psInsert = con.prepareStatement( sqlI ))
         {
-            psInsert.setString( 1, (String) ce.getKey() );
+            psInsert.setString( 1, ce.getKey().toString() );
             psInsert.setString( 2, this.getCacheName() );
             psInsert.setBytes( 3, element );
             psInsert.setLong( 4, ce.getElementAttributes().getMaxLife() );
-            if ( ce.getElementAttributes().getIsEternal() )
-            {
-                psInsert.setString( 5, "T" );
-            }
-            else
-            {
-                psInsert.setString( 5, "F" );
-            }
+            psInsert.setString( 5, ce.getElementAttributes().getIsEternal() ? "T" : "F" );
+
             final Timestamp createTime = new Timestamp( ce.getElementAttributes().getCreateTime() );
             psInsert.setTimestamp( 6, createTime );
 
@@ -282,9 +261,9 @@
      */
     private void updateRow( final ICacheElement<K, V> ce, final Connection con, final byte[] element )
     {
-        final String sqlU = "update " + getJdbcDiskCacheAttributes().getTableName()
+        final String sqlU = String.format("update %s"
                 + " set ELEMENT  = ?, CREATE_TIME = ?, UPDATE_TIME_SECONDS = ?, " + " SYSTEM_EXPIRE_TIME_SECONDS = ? "
-                + " where CACHE_KEY = ? and REGION = ?";
+                + " where CACHE_KEY = ? and REGION = ?", getJdbcDiskCacheAttributes().getTableName());
 
         try (PreparedStatement psUpdate = con.prepareStatement( sqlU ))
         {
@@ -322,8 +301,8 @@
     {
         boolean exists = false;
         // don't select the element, since we want this to be fast.
-        final String sqlS = "select CACHE_KEY from " + getJdbcDiskCacheAttributes().getTableName()
-            + " where REGION = ? and CACHE_KEY = ?";
+        final String sqlS = String.format("select CACHE_KEY from %s where REGION = ? and CACHE_KEY = ?",
+                getJdbcDiskCacheAttributes().getTableName());
 
         try (PreparedStatement psSelect = con.prepareStatement( sqlS ))
         {
@@ -366,37 +345,36 @@
 
         ICacheElement<K, V> obj = null;
 
-        byte[] data = null;
-        try
+        // region, key
+        final String selectString = String.format("select ELEMENT from %s where REGION = ? and CACHE_KEY = ?",
+                getJdbcDiskCacheAttributes().getTableName());
+
+        try (Connection con = getDataSource().getConnection())
         {
-            // region, key
-            final String selectString = "select ELEMENT from " + getJdbcDiskCacheAttributes().getTableName()
-                + " where REGION = ? and CACHE_KEY = ?";
-
-            try (Connection con = getDataSource().getConnection())
+            try (PreparedStatement psSelect = con.prepareStatement( selectString ))
             {
-                try (PreparedStatement psSelect = con.prepareStatement( selectString ))
-                {
-                    psSelect.setString( 1, this.getCacheName() );
-                    psSelect.setString( 2, key.toString() );
+                psSelect.setString( 1, this.getCacheName() );
+                psSelect.setString( 2, key.toString() );
 
-                    try (ResultSet rs = psSelect.executeQuery())
+                try (ResultSet rs = psSelect.executeQuery())
+                {
+                    byte[] data = null;
+
+                    if ( rs.next() )
                     {
-                        if ( rs.next() )
+                        data = rs.getBytes( 1 );
+                    }
+
+                    if ( data != null )
+                    {
+                        try
                         {
-                            data = rs.getBytes( 1 );
+                            // USE THE SERIALIZER
+                            obj = getElementSerializer().deSerialize( data, null );
                         }
-                        if ( data != null )
+                        catch ( final IOException | ClassNotFoundException e )
                         {
-                            try
-                            {
-                                // USE THE SERIALIZER
-                                obj = getElementSerializer().deSerialize( data, null );
-                            }
-                            catch ( final Exception e )
-                            {
-                                log.error( "Problem getting item for key [{0}]", key, e );
-                            }
+                            log.error( "Problem getting item for key [{0}]", key, e );
                         }
                     }
                 }
@@ -408,14 +386,6 @@
                     key, sqle );
         }
 
-        if ( log.isInfoEnabled() )
-        {
-            if ( getCount.get() % LOG_INTERVAL == 0 )
-            {
-                // TODO make a log stats method
-                log.info( "Get Count [{0}]", getCount );
-            }
-        }
         return obj;
     }
 
@@ -440,37 +410,33 @@
 
         final Map<K, ICacheElement<K, V>> results = new HashMap<>();
 
-        try
+        // region, key
+        final String selectString = String.format("select ELEMENT from %s where REGION = ? and CACHE_KEY like ?",
+                getJdbcDiskCacheAttributes().getTableName());
+
+        try (Connection con = getDataSource().getConnection())
         {
-            // region, key
-            final String selectString = "select CACHE_KEY, ELEMENT from " + getJdbcDiskCacheAttributes().getTableName()
-                + " where REGION = ? and CACHE_KEY like ?";
-
-            try (Connection con = getDataSource().getConnection())
+            try (PreparedStatement psSelect = con.prepareStatement( selectString ))
             {
-                try (PreparedStatement psSelect = con.prepareStatement( selectString ))
-                {
-                    psSelect.setString( 1, this.getCacheName() );
-                    psSelect.setString( 2, constructLikeParameterFromPattern( pattern ) );
+                psSelect.setString( 1, this.getCacheName() );
+                psSelect.setString( 2, constructLikeParameterFromPattern( pattern ) );
 
-                    try (ResultSet rs = psSelect.executeQuery())
+                try (ResultSet rs = psSelect.executeQuery())
+                {
+                    while ( rs.next() )
                     {
-                        while ( rs.next() )
+                        final byte[] data = rs.getBytes(1);
+                        if ( data != null )
                         {
-                            final String key = rs.getString( 1 );
-                            final byte[] data = rs.getBytes( 2 );
-                            if ( data != null )
+                            try
                             {
-                                try
-                                {
-                                    // USE THE SERIALIZER
-                                    final ICacheElement<K, V> value = getElementSerializer().deSerialize( data, null );
-                                    results.put( (K) key, value );
-                                }
-                                catch ( final Exception e )
-                                {
-                                    log.error( "Problem getting items for pattern [{0}]", pattern, e );
-                                }
+                                // USE THE SERIALIZER
+                                final ICacheElement<K, V> value = getElementSerializer().deSerialize( data, null );
+                                results.put( value.getKey(), value );
+                            }
+                            catch ( final IOException | ClassNotFoundException e )
+                            {
+                                log.error( "Problem getting items for pattern [{0}]", pattern, e );
                             }
                         }
                     }
@@ -483,14 +449,6 @@
                     pattern, sqle );
         }
 
-        if ( log.isInfoEnabled() )
-        {
-            if ( getMatchingCount.get() % LOG_INTERVAL == 0 )
-            {
-                // TODO make a log stats method
-                log.info( "Get Matching Count [{0}]", getMatchingCount);
-            }
-        }
         return results;
     }
 
@@ -519,21 +477,18 @@
     protected boolean processRemove( final K key )
     {
         // remove single item.
-        String sql = "delete from " + getJdbcDiskCacheAttributes().getTableName()
-            + " where REGION = ? and CACHE_KEY = ?";
+        final String sqlSingle = String.format("delete from %s where REGION = ? and CACHE_KEY = ?",
+                getJdbcDiskCacheAttributes().getTableName());
+        // remove all keys of the same name group.
+        final String sqlPartial = String.format("delete from %s where REGION = ? and CACHE_KEY like ?",
+                getJdbcDiskCacheAttributes().getTableName());
 
         try (Connection con = getDataSource().getConnection())
         {
-            boolean partial = false;
-            if ( key instanceof String && key.toString().endsWith( NAME_COMPONENT_DELIMITER ) )
-            {
-                // remove all keys of the same name group.
-                sql = "delete from " + getJdbcDiskCacheAttributes().getTableName()
-                    + " where REGION = ? and CACHE_KEY like ?";
-                partial = true;
-            }
+            boolean partial = key.toString().endsWith(ICache.NAME_COMPONENT_DELIMITER);
+            String sql = partial ? sqlPartial : sqlSingle;
 
-            try (PreparedStatement psSelect = con.prepareStatement( sql ))
+            try (PreparedStatement psSelect = con.prepareStatement(sql))
             {
                 psSelect.setString( 1, this.getCacheName() );
                 if ( partial )
@@ -573,10 +528,11 @@
         // it should never get here from the abstract disk cache.
         if ( this.jdbcDiskCacheAttributes.isAllowRemoveAll() )
         {
+            final String sql = String.format("delete from %s where REGION = ?",
+                    getJdbcDiskCacheAttributes().getTableName());
+
             try (Connection con = getDataSource().getConnection())
             {
-                final String sql = "delete from " + getJdbcDiskCacheAttributes().getTableName() + " where REGION = ?";
-
                 try (PreparedStatement psDelete = con.prepareStatement( sql ))
                 {
                     psDelete.setString( 1, this.getCacheName() );
@@ -624,8 +580,8 @@
                 getTableState().setState( TableState.DELETE_RUNNING );
                 final long now = System.currentTimeMillis() / 1000;
 
-                final String sql = "delete from " + getJdbcDiskCacheAttributes().getTableName()
-                    + " where IS_ETERNAL = ? and REGION = ? and ? > SYSTEM_EXPIRE_TIME_SECONDS";
+                final String sql = String.format("delete from %s where IS_ETERNAL = ? and REGION = ?"
+                        + " and ? > SYSTEM_EXPIRE_TIME_SECONDS", getJdbcDiskCacheAttributes().getTableName());
 
                 try (PreparedStatement psDelete = con.prepareStatement( sql ))
                 {
@@ -679,7 +635,7 @@
     @Override
     public void processDispose()
     {
-        final ICacheEvent<K> cacheEvent = createICacheEvent( getCacheName(), (K)"none", ICacheEventLogger.DISPOSE_EVENT );
+        final ICacheEvent<K> cacheEvent = createICacheEvent( getCacheName(), (K)null, ICacheEventLogger.DISPOSE_EVENT );
 
         try
         {
@@ -706,8 +662,8 @@
         int size = 0;
 
         // region, key
-        final String selectString = "select count(*) from " + getJdbcDiskCacheAttributes().getTableName()
-            + " where REGION = ?";
+        final String selectString = String.format("select count(*) from %s where REGION = ?",
+                getJdbcDiskCacheAttributes().getTableName());
 
         try (Connection con = getDataSource().getConnection())
         {
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPCacheFactory.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPCacheFactory.java
index 69a0eca..fc08301 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPCacheFactory.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPCacheFactory.java
@@ -390,7 +390,9 @@
             // get dereferenced, also we don't want one for every region.
             discovery = UDPDiscoveryManager.getInstance().getService( lac.getUdpDiscoveryAddr(),
                                                                       lac.getUdpDiscoveryPort(),
-                                                                      lac.getTcpListenerPort(), cacheMgr);
+                                                                      lac.getTcpListenerPort(),
+                                                                      cacheMgr,
+                                                                      elementSerializer);
 
             discovery.addParticipatingCacheName( lac.getCacheName() );
             discovery.addDiscoveryListener( discoveryListener );
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/CacheGroup.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/CacheGroup.java
index 64fb357..0c79706 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/CacheGroup.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/CacheGroup.java
@@ -22,9 +22,10 @@
 import org.apache.commons.jcs3.engine.behavior.IElementAttributes;
 
 /**
- * Holder for attributes specific to a group. The grouping functionality is on
- * the way out.
+ * Holder for attributes specific to a group.
+ * @deprecated The grouping functionality is on the way out.
  */
+@Deprecated
 public class CacheGroup
 {
     /** Element configuration. */
@@ -47,9 +48,9 @@
     }
 
     /**
-     * Gets the attrributes attribute of the CacheGroup object
+     * Gets the attributes attribute of the CacheGroup object
      * <p>
-     * @return The attrributes value
+     * @return The attributes value
      */
     public IElementAttributes getElementAttrributes()
     {
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/control/group/GroupAttrName.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/control/group/GroupAttrName.java
index fc194fe..dd150a2 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/control/group/GroupAttrName.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/control/group/GroupAttrName.java
@@ -20,6 +20,7 @@
  */
 
 import java.io.Serializable;
+import java.util.Objects;
 
 /**
  * Description of the Class
@@ -36,9 +37,6 @@
     /** the name of the attribute */
     public final T attrName;
 
-    /** Cached toString value */
-    private String toString;
-
     /**
      * Constructor for the GroupAttrName object
      * @param groupId
@@ -71,16 +69,7 @@
 
         if (groupId.equals( to.groupId ))
         {
-            if (attrName == null && to.attrName == null)
-            {
-                return true;
-            }
-            else if (attrName == null || to.attrName == null)
-            {
-                return false;
-            }
-
-            return  attrName.equals( to.attrName );
+            return Objects.equals(attrName, to.attrName);
         }
 
         return false;
@@ -92,12 +81,7 @@
     @Override
     public int hashCode()
     {
-        if (attrName == null)
-        {
-            return groupId.hashCode();
-        }
-
-        return groupId.hashCode() ^ attrName.hashCode();
+        return Objects.hash(groupId, attrName);
     }
 
     /**
@@ -106,12 +90,7 @@
     @Override
     public String toString()
     {
-        if ( toString == null )
-        {
-            toString = "[GAN: groupId=" + groupId + ", attrName=" + attrName + "]";
-        }
-
-        return toString;
+        return String.format("GAN:%s:%s", groupId, Objects.toString(attrName, ""));
     }
 
 }
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/control/group/GroupId.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/control/group/GroupId.java
index 86f90b6..fa4bee4 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/control/group/GroupId.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/control/group/GroupId.java
@@ -20,6 +20,7 @@
  */
 
 import java.io.Serializable;
+import java.util.Objects;
 
 /**
  * Used to avoid name conflict when group cache items are mixed with non-group cache items in the
@@ -37,9 +38,6 @@
     /** the name of the region. */
     public final String cacheName;
 
-    /** Cached toString value. */
-    private String toString;
-
     /**
      * Constructor for the GroupId object
      * <p>
@@ -77,27 +75,22 @@
     }
 
     /**
-     * @return cacheName.hashCode() + groupName.hashCode();
+     * @return Objects.hash(cacheName, groupName);
      */
     @Override
     public int hashCode()
     {
-        return cacheName.hashCode() + groupName.hashCode();
+        return Objects.hash(cacheName, groupName);
     }
 
     /**
-     * Caches the value.
-     * <p>
-     * @return debugging string.
+     * Convert to string
+     *
+     * @return the string representation of this ID.
      */
     @Override
     public String toString()
     {
-        if ( toString == null )
-        {
-            toString = "[groupId=" + cacheName + ", " + groupName + ']';
-        }
-
-        return toString;
+        return String.format("[groupId=%s, %s]", cacheName, groupName);
     }
 }
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryManager.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryManager.java
index 1d51838..f07f4de 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryManager.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryManager.java
@@ -23,9 +23,11 @@
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.commons.jcs3.engine.behavior.ICompositeCacheManager;
+import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
 import org.apache.commons.jcs3.engine.behavior.IProvideScheduler;
 import org.apache.commons.jcs3.log.Log;
 import org.apache.commons.jcs3.log.LogManager;
+import org.apache.commons.jcs3.utils.serialization.StandardSerializer;
 
 /**
  * This manages UDPDiscovery Services. We should end up with one service per Lateral Cache Manager
@@ -60,6 +62,7 @@
         return INSTANCE;
     }
 
+
     /**
      * Creates a service for the address and port if one doesn't exist already.
      * <p>
@@ -71,11 +74,33 @@
      * @param servicePort
      * @param cacheMgr
      * @return UDPDiscoveryService
+     * @deprecated Specify serializer implementation explicitly
      */
+    @Deprecated
     public UDPDiscoveryService getService( final String discoveryAddress, final int discoveryPort, final int servicePort,
                                                         final ICompositeCacheManager cacheMgr )
     {
-        final String key = discoveryAddress + ":" + discoveryPort + ":" + servicePort;
+        return getService(discoveryAddress, discoveryPort, servicePort, cacheMgr, new StandardSerializer());
+    }
+
+    /**
+     * Creates a service for the address and port if one doesn't exist already.
+     * <p>
+     * We need to key this using the listener port too. TODO think of making one discovery service
+     * work for multiple types of clients.
+     * <p>
+     * @param discoveryAddress
+     * @param discoveryPort
+     * @param servicePort
+     * @param cacheMgr
+     * @param serializer
+     *
+     * @return UDPDiscoveryService
+     */
+    public UDPDiscoveryService getService( final String discoveryAddress, final int discoveryPort,
+            final int servicePort, final ICompositeCacheManager cacheMgr, final IElementSerializer serializer )
+    {
+        final String key = String.join(":", discoveryAddress, String.valueOf(discoveryPort), String.valueOf(servicePort));
 
         final UDPDiscoveryService service = services.computeIfAbsent(key, k -> {
             log.info( "Creating service for address:port:servicePort [{0}]", key );
@@ -85,7 +110,7 @@
             attributes.setUdpDiscoveryPort( discoveryPort );
             attributes.setServicePort( servicePort );
 
-            final UDPDiscoveryService newService = new UDPDiscoveryService( attributes );
+            final UDPDiscoveryService newService = new UDPDiscoveryService(attributes, serializer);
 
             // register for shutdown notification
             cacheMgr.registerShutdownObserver( newService );
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryReceiver.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryReceiver.java
index e269301..60b872c 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryReceiver.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryReceiver.java
@@ -19,9 +19,7 @@
  * under the License.
  */
 
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
-import java.io.ObjectInputStream;
 import java.net.DatagramPacket;
 import java.net.InetAddress;
 import java.net.MulticastSocket;
@@ -32,7 +30,6 @@
 
 import org.apache.commons.jcs3.engine.CacheInfo;
 import org.apache.commons.jcs3.engine.behavior.IShutdownObserver;
-import org.apache.commons.jcs3.io.ObjectInputStreamClassLoaderAware;
 import org.apache.commons.jcs3.log.Log;
 import org.apache.commons.jcs3.log.LogManager;
 import org.apache.commons.jcs3.utils.net.HostNameUtil;
@@ -47,9 +44,6 @@
     /** The log factory */
     private static final Log log = LogManager.getLog( UDPDiscoveryReceiver.class );
 
-    /** buffer */
-    private final byte[] mBuffer = new byte[65536];
-
     /** The socket used for communication. */
     private MulticastSocket mSocket;
 
@@ -159,7 +153,8 @@
     public Object waitForMessage()
         throws IOException
     {
-        final DatagramPacket packet = new DatagramPacket( mBuffer, mBuffer.length );
+        final byte[] mBuffer = new byte[65536];
+        final DatagramPacket packet = new DatagramPacket(mBuffer, mBuffer.length);
         Object obj = null;
         try
         {
@@ -170,11 +165,7 @@
             log.debug( "Received packet from address [{0}]",
                     () -> packet.getSocketAddress() );
 
-            try (ByteArrayInputStream byteStream = new ByteArrayInputStream(mBuffer, 0, packet.getLength());
-                 ObjectInputStream objectStream = new ObjectInputStreamClassLoaderAware(byteStream, null))
-            {
-                obj = objectStream.readObject();
-            }
+            obj = service.getSerializer().deSerialize(mBuffer, null);
 
             if ( obj instanceof UDPDiscoveryMessage )
             {
@@ -188,7 +179,7 @@
                         packet.getSocketAddress(), obj );
             }
         }
-        catch ( final Exception e )
+        catch ( final IOException | ClassNotFoundException e )
         {
             log.error( "Error receiving multicast packet", e );
         }
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoverySender.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoverySender.java
index b153401..b35be3c 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoverySender.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoverySender.java
@@ -27,6 +27,7 @@
 import java.util.ArrayList;
 
 import org.apache.commons.jcs3.engine.CacheInfo;
+import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
 import org.apache.commons.jcs3.log.Log;
 import org.apache.commons.jcs3.log.LogManager;
 import org.apache.commons.jcs3.utils.discovery.UDPDiscoveryMessage.BroadcastType;
@@ -52,7 +53,7 @@
     private final int multicastPort;
 
     /** Used to serialize messages */
-    private final StandardSerializer serializer = new StandardSerializer();
+    private final IElementSerializer serializer;
 
     /**
      * Constructor for the UDPDiscoverySender object
@@ -65,10 +66,31 @@
      * @param port
      * @param udpTTL the Datagram packet time-to-live
      * @throws IOException
+     * @deprecated Specify serializer implementation explicitly
      */
+    @Deprecated
     public UDPDiscoverySender( final String host, final int port, final int udpTTL )
         throws IOException
     {
+        this(host, port, udpTTL, new StandardSerializer());
+    }
+
+    /**
+     * Constructor for the UDPDiscoverySender object
+     * <p>
+     * This sender can be used to send multiple messages.
+     * <p>
+     * When you are done sending, you should destroy the socket sender.
+     * <p>
+     * @param host
+     * @param port
+     * @param udpTTL the Datagram packet time-to-live
+     * @param serializer the Serializer to use when sending messages
+     * @throws IOException
+     */
+    public UDPDiscoverySender( final String host, final int port, final int udpTTL, IElementSerializer serializer)
+        throws IOException
+    {
         try
         {
             log.debug( "Constructing socket for sender on port [{0}]", port );
@@ -89,6 +111,7 @@
         }
 
         this.multicastPort = port;
+        this.serializer = serializer;
     }
 
     /**
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryService.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryService.java
index bbe4b4e..8d08463 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryService.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryService.java
@@ -32,12 +32,14 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
 import org.apache.commons.jcs3.engine.behavior.IRequireScheduler;
 import org.apache.commons.jcs3.engine.behavior.IShutdownObserver;
 import org.apache.commons.jcs3.log.Log;
 import org.apache.commons.jcs3.log.LogManager;
 import org.apache.commons.jcs3.utils.discovery.behavior.IDiscoveryListener;
 import org.apache.commons.jcs3.utils.net.HostNameUtil;
+import org.apache.commons.jcs3.utils.serialization.StandardSerializer;
 
 /**
  * This service creates a listener that can create lateral caches and add them to the no wait list.
@@ -63,8 +65,11 @@
     /** attributes */
     private UDPDiscoveryAttributes udpDiscoveryAttributes;
 
+    /** Used to serialize messages */
+    private final IElementSerializer serializer;
+
     /** is this shut down? */
-    private AtomicBoolean shutdown = new AtomicBoolean(false);
+    private final AtomicBoolean shutdown = new AtomicBoolean(false);
 
     /** This is a set of services that have been discovered. */
     private final ConcurrentMap<Integer, DiscoveredService> discoveredServices = new ConcurrentHashMap<>();
@@ -82,9 +87,24 @@
     private ScheduledFuture<?> cleanupTaskFuture = null;
 
     /**
-     * @param attributes
+     * Constructor
+     *
+     * @param attributes settings of the service
+     * @deprecated Specify serializer implementation explicitly
      */
-    public UDPDiscoveryService( final UDPDiscoveryAttributes attributes)
+    @Deprecated
+    public UDPDiscoveryService(final UDPDiscoveryAttributes attributes)
+    {
+        this(attributes, new StandardSerializer());
+    }
+
+    /**
+     * Constructor
+     *
+     * @param attributes settings of service
+     * @param serializer the serializer to use to send and receive messages
+     */
+    public UDPDiscoveryService(final UDPDiscoveryAttributes attributes, IElementSerializer serializer)
     {
         udpDiscoveryAttributes = attributes.clone();
 
@@ -114,6 +134,8 @@
                     getUdpDiscoveryAttributes().getUdpDiscoveryPort(), e );
         }
 
+        this.serializer = serializer;
+
         // initiate sender broadcast
         initiateBroadcast();
     }
@@ -177,7 +199,8 @@
         try (UDPDiscoverySender sender = new UDPDiscoverySender(
                 getUdpDiscoveryAttributes().getUdpDiscoveryAddr(),
                 getUdpDiscoveryAttributes().getUdpDiscoveryPort(),
-                getUdpDiscoveryAttributes().getUdpTTL()))
+                getUdpDiscoveryAttributes().getUdpTTL(),
+                getSerializer()))
         {
             sender.requestBroadcast();
 
@@ -202,7 +225,8 @@
         try (UDPDiscoverySender sender = new UDPDiscoverySender(
                 getUdpDiscoveryAttributes().getUdpDiscoveryAddr(),
                 getUdpDiscoveryAttributes().getUdpDiscoveryPort(),
-                getUdpDiscoveryAttributes().getUdpTTL()))
+                getUdpDiscoveryAttributes().getUdpTTL(),
+                getSerializer()))
         {
             sender.passiveBroadcast(
                     getUdpDiscoveryAttributes().getServiceAddress(),
@@ -230,7 +254,8 @@
         try (UDPDiscoverySender sender = new UDPDiscoverySender(
                 getUdpDiscoveryAttributes().getUdpDiscoveryAddr(),
                 getUdpDiscoveryAttributes().getUdpDiscoveryPort(),
-                getUdpDiscoveryAttributes().getUdpTTL()))
+                getUdpDiscoveryAttributes().getUdpTTL(),
+                getSerializer()))
         {
             sender.removeBroadcast(
                     getUdpDiscoveryAttributes().getServiceAddress(),
@@ -335,6 +360,16 @@
     }
 
     /**
+     * Return the serializer implementation
+     *
+     * @return the serializer
+     */
+    public IElementSerializer getSerializer()
+    {
+        return serializer;
+    }
+
+    /**
      * Start necessary receiver thread
      */
     public void startup()
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/serialization/CompressingSerializer.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/serialization/CompressingSerializer.java
index 9043822..b9ffb80 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/serialization/CompressingSerializer.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/serialization/CompressingSerializer.java
@@ -21,6 +21,7 @@
 
 import java.io.IOException;
 
+import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
 import org.apache.commons.jcs3.utils.zip.CompressionUtil;
 
 /**
@@ -28,6 +29,28 @@
  */
 public class CompressingSerializer extends StandardSerializer
 {
+    /** Wrapped serializer */
+    private final IElementSerializer serializer;
+
+
+    /**
+     * Default constructor
+     */
+    public CompressingSerializer()
+    {
+        this(new StandardSerializer());
+    }
+
+    /**
+     * Wrapper constructor
+     *
+     * @param serializer the wrapped serializer
+     */
+    public CompressingSerializer(IElementSerializer serializer)
+    {
+        this.serializer = serializer;
+    }
+
     /**
      * Serializes an object using default serialization. Compresses the byte array.
      * <p>
@@ -39,7 +62,7 @@
     public <T> byte[] serialize( final T obj )
         throws IOException
     {
-        final byte[] uncompressed = super.serialize(obj);
+        final byte[] uncompressed = serializer.serialize(obj);
         final byte[] compressed = CompressionUtil.compressByteArray( uncompressed );
         return compressed;
     }
@@ -64,6 +87,6 @@
         }
 
         final byte[] decompressedByteArray = CompressionUtil.decompressByteArray( data );
-        return super.deSerialize(decompressedByteArray, loader);
+        return serializer.deSerialize(decompressedByteArray, loader);
     }
 }