Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/commons-jcs.git
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/disk/jdbc/JDBCDiskCache.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/disk/jdbc/JDBCDiskCache.java
index b425a61..4c8ede0 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/disk/jdbc/JDBCDiskCache.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/disk/jdbc/JDBCDiskCache.java
@@ -37,6 +37,7 @@
import org.apache.commons.jcs3.auxiliary.AuxiliaryCacheAttributes;
import org.apache.commons.jcs3.auxiliary.disk.AbstractDiskCache;
import org.apache.commons.jcs3.auxiliary.disk.jdbc.dsfactory.DataSourceFactory;
+import org.apache.commons.jcs3.engine.behavior.ICache;
import org.apache.commons.jcs3.engine.behavior.ICacheElement;
import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
import org.apache.commons.jcs3.engine.logging.behavior.ICacheEvent;
@@ -99,9 +100,6 @@
/** # of times getMatching was called */
private final AtomicInteger getMatchingCount = new AtomicInteger(0);
- /** if count % interval == 0 then log */
- private static final int LOG_INTERVAL = 100;
-
/** db connection pool */
private final DataSourceFactory dsFactory;
@@ -150,33 +148,20 @@
{
log.debug( "Putting [{0}] on disk.", () -> ce.getKey());
- final byte[] element;
-
try
{
- element = getElementSerializer().serialize( ce );
+ final byte[] element = getElementSerializer().serialize( ce );
+ insertOrUpdate( ce, con, element );
}
catch ( final IOException e )
{
log.error( "Could not serialize element", e );
- return;
}
-
- insertOrUpdate( ce, con, element );
}
catch ( final SQLException e )
{
log.error( "Problem getting connection.", e );
}
-
- if ( log.isInfoEnabled() )
- {
- if ( updateCount.get() % LOG_INTERVAL == 0 )
- {
- // TODO make a log stats method
- log.info( "Update Count [{0}]", updateCount);
- }
- }
}
/**
@@ -222,25 +207,19 @@
private boolean insertRow( final ICacheElement<K, V> ce, final Connection con, final byte[] element )
{
boolean exists = false;
- final String sqlI = "insert into "
- + getJdbcDiskCacheAttributes().getTableName()
- + " (CACHE_KEY, REGION, ELEMENT, MAX_LIFE_SECONDS, IS_ETERNAL, CREATE_TIME, UPDATE_TIME_SECONDS, SYSTEM_EXPIRE_TIME_SECONDS) "
- + " values (?, ?, ?, ?, ?, ?, ?, ?)";
+ final String sqlI = String.format("insert into %s"
+ + " (CACHE_KEY, REGION, ELEMENT, MAX_LIFE_SECONDS, IS_ETERNAL, CREATE_TIME, UPDATE_TIME_SECONDS,"
+ + " SYSTEM_EXPIRE_TIME_SECONDS) "
+ + " values (?, ?, ?, ?, ?, ?, ?, ?)", getJdbcDiskCacheAttributes().getTableName());
try (PreparedStatement psInsert = con.prepareStatement( sqlI ))
{
- psInsert.setString( 1, (String) ce.getKey() );
+ psInsert.setString( 1, ce.getKey().toString() );
psInsert.setString( 2, this.getCacheName() );
psInsert.setBytes( 3, element );
psInsert.setLong( 4, ce.getElementAttributes().getMaxLife() );
- if ( ce.getElementAttributes().getIsEternal() )
- {
- psInsert.setString( 5, "T" );
- }
- else
- {
- psInsert.setString( 5, "F" );
- }
+ psInsert.setString( 5, ce.getElementAttributes().getIsEternal() ? "T" : "F" );
+
final Timestamp createTime = new Timestamp( ce.getElementAttributes().getCreateTime() );
psInsert.setTimestamp( 6, createTime );
@@ -282,9 +261,9 @@
*/
private void updateRow( final ICacheElement<K, V> ce, final Connection con, final byte[] element )
{
- final String sqlU = "update " + getJdbcDiskCacheAttributes().getTableName()
+ final String sqlU = String.format("update %s"
+ " set ELEMENT = ?, CREATE_TIME = ?, UPDATE_TIME_SECONDS = ?, " + " SYSTEM_EXPIRE_TIME_SECONDS = ? "
- + " where CACHE_KEY = ? and REGION = ?";
+ + " where CACHE_KEY = ? and REGION = ?", getJdbcDiskCacheAttributes().getTableName());
try (PreparedStatement psUpdate = con.prepareStatement( sqlU ))
{
@@ -322,8 +301,8 @@
{
boolean exists = false;
// don't select the element, since we want this to be fast.
- final String sqlS = "select CACHE_KEY from " + getJdbcDiskCacheAttributes().getTableName()
- + " where REGION = ? and CACHE_KEY = ?";
+ final String sqlS = String.format("select CACHE_KEY from %s where REGION = ? and CACHE_KEY = ?",
+ getJdbcDiskCacheAttributes().getTableName());
try (PreparedStatement psSelect = con.prepareStatement( sqlS ))
{
@@ -366,37 +345,36 @@
ICacheElement<K, V> obj = null;
- byte[] data = null;
- try
+ // region, key
+ final String selectString = String.format("select ELEMENT from %s where REGION = ? and CACHE_KEY = ?",
+ getJdbcDiskCacheAttributes().getTableName());
+
+ try (Connection con = getDataSource().getConnection())
{
- // region, key
- final String selectString = "select ELEMENT from " + getJdbcDiskCacheAttributes().getTableName()
- + " where REGION = ? and CACHE_KEY = ?";
-
- try (Connection con = getDataSource().getConnection())
+ try (PreparedStatement psSelect = con.prepareStatement( selectString ))
{
- try (PreparedStatement psSelect = con.prepareStatement( selectString ))
- {
- psSelect.setString( 1, this.getCacheName() );
- psSelect.setString( 2, key.toString() );
+ psSelect.setString( 1, this.getCacheName() );
+ psSelect.setString( 2, key.toString() );
- try (ResultSet rs = psSelect.executeQuery())
+ try (ResultSet rs = psSelect.executeQuery())
+ {
+ byte[] data = null;
+
+ if ( rs.next() )
{
- if ( rs.next() )
+ data = rs.getBytes( 1 );
+ }
+
+ if ( data != null )
+ {
+ try
{
- data = rs.getBytes( 1 );
+ // USE THE SERIALIZER
+ obj = getElementSerializer().deSerialize( data, null );
}
- if ( data != null )
+ catch ( final IOException | ClassNotFoundException e )
{
- try
- {
- // USE THE SERIALIZER
- obj = getElementSerializer().deSerialize( data, null );
- }
- catch ( final Exception e )
- {
- log.error( "Problem getting item for key [{0}]", key, e );
- }
+ log.error( "Problem getting item for key [{0}]", key, e );
}
}
}
@@ -408,14 +386,6 @@
key, sqle );
}
- if ( log.isInfoEnabled() )
- {
- if ( getCount.get() % LOG_INTERVAL == 0 )
- {
- // TODO make a log stats method
- log.info( "Get Count [{0}]", getCount );
- }
- }
return obj;
}
@@ -440,37 +410,33 @@
final Map<K, ICacheElement<K, V>> results = new HashMap<>();
- try
+ // region, key
+ final String selectString = String.format("select ELEMENT from %s where REGION = ? and CACHE_KEY like ?",
+ getJdbcDiskCacheAttributes().getTableName());
+
+ try (Connection con = getDataSource().getConnection())
{
- // region, key
- final String selectString = "select CACHE_KEY, ELEMENT from " + getJdbcDiskCacheAttributes().getTableName()
- + " where REGION = ? and CACHE_KEY like ?";
-
- try (Connection con = getDataSource().getConnection())
+ try (PreparedStatement psSelect = con.prepareStatement( selectString ))
{
- try (PreparedStatement psSelect = con.prepareStatement( selectString ))
- {
- psSelect.setString( 1, this.getCacheName() );
- psSelect.setString( 2, constructLikeParameterFromPattern( pattern ) );
+ psSelect.setString( 1, this.getCacheName() );
+ psSelect.setString( 2, constructLikeParameterFromPattern( pattern ) );
- try (ResultSet rs = psSelect.executeQuery())
+ try (ResultSet rs = psSelect.executeQuery())
+ {
+ while ( rs.next() )
{
- while ( rs.next() )
+ final byte[] data = rs.getBytes(1);
+ if ( data != null )
{
- final String key = rs.getString( 1 );
- final byte[] data = rs.getBytes( 2 );
- if ( data != null )
+ try
{
- try
- {
- // USE THE SERIALIZER
- final ICacheElement<K, V> value = getElementSerializer().deSerialize( data, null );
- results.put( (K) key, value );
- }
- catch ( final Exception e )
- {
- log.error( "Problem getting items for pattern [{0}]", pattern, e );
- }
+ // USE THE SERIALIZER
+ final ICacheElement<K, V> value = getElementSerializer().deSerialize( data, null );
+ results.put( value.getKey(), value );
+ }
+ catch ( final IOException | ClassNotFoundException e )
+ {
+ log.error( "Problem getting items for pattern [{0}]", pattern, e );
}
}
}
@@ -483,14 +449,6 @@
pattern, sqle );
}
- if ( log.isInfoEnabled() )
- {
- if ( getMatchingCount.get() % LOG_INTERVAL == 0 )
- {
- // TODO make a log stats method
- log.info( "Get Matching Count [{0}]", getMatchingCount);
- }
- }
return results;
}
@@ -519,21 +477,18 @@
protected boolean processRemove( final K key )
{
// remove single item.
- String sql = "delete from " + getJdbcDiskCacheAttributes().getTableName()
- + " where REGION = ? and CACHE_KEY = ?";
+ final String sqlSingle = String.format("delete from %s where REGION = ? and CACHE_KEY = ?",
+ getJdbcDiskCacheAttributes().getTableName());
+ // remove all keys of the same name group.
+ final String sqlPartial = String.format("delete from %s where REGION = ? and CACHE_KEY like ?",
+ getJdbcDiskCacheAttributes().getTableName());
try (Connection con = getDataSource().getConnection())
{
- boolean partial = false;
- if ( key instanceof String && key.toString().endsWith( NAME_COMPONENT_DELIMITER ) )
- {
- // remove all keys of the same name group.
- sql = "delete from " + getJdbcDiskCacheAttributes().getTableName()
- + " where REGION = ? and CACHE_KEY like ?";
- partial = true;
- }
+ boolean partial = key.toString().endsWith(ICache.NAME_COMPONENT_DELIMITER);
+ String sql = partial ? sqlPartial : sqlSingle;
- try (PreparedStatement psSelect = con.prepareStatement( sql ))
+ try (PreparedStatement psSelect = con.prepareStatement(sql))
{
psSelect.setString( 1, this.getCacheName() );
if ( partial )
@@ -573,10 +528,11 @@
// it should never get here from the abstract disk cache.
if ( this.jdbcDiskCacheAttributes.isAllowRemoveAll() )
{
+ final String sql = String.format("delete from %s where REGION = ?",
+ getJdbcDiskCacheAttributes().getTableName());
+
try (Connection con = getDataSource().getConnection())
{
- final String sql = "delete from " + getJdbcDiskCacheAttributes().getTableName() + " where REGION = ?";
-
try (PreparedStatement psDelete = con.prepareStatement( sql ))
{
psDelete.setString( 1, this.getCacheName() );
@@ -624,8 +580,8 @@
getTableState().setState( TableState.DELETE_RUNNING );
final long now = System.currentTimeMillis() / 1000;
- final String sql = "delete from " + getJdbcDiskCacheAttributes().getTableName()
- + " where IS_ETERNAL = ? and REGION = ? and ? > SYSTEM_EXPIRE_TIME_SECONDS";
+ final String sql = String.format("delete from %s where IS_ETERNAL = ? and REGION = ?"
+ + " and ? > SYSTEM_EXPIRE_TIME_SECONDS", getJdbcDiskCacheAttributes().getTableName());
try (PreparedStatement psDelete = con.prepareStatement( sql ))
{
@@ -679,7 +635,7 @@
@Override
public void processDispose()
{
- final ICacheEvent<K> cacheEvent = createICacheEvent( getCacheName(), (K)"none", ICacheEventLogger.DISPOSE_EVENT );
+ final ICacheEvent<K> cacheEvent = createICacheEvent( getCacheName(), (K)null, ICacheEventLogger.DISPOSE_EVENT );
try
{
@@ -706,8 +662,8 @@
int size = 0;
// region, key
- final String selectString = "select count(*) from " + getJdbcDiskCacheAttributes().getTableName()
- + " where REGION = ?";
+ final String selectString = String.format("select count(*) from %s where REGION = ?",
+ getJdbcDiskCacheAttributes().getTableName());
try (Connection con = getDataSource().getConnection())
{
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPCacheFactory.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPCacheFactory.java
index 69a0eca..fc08301 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPCacheFactory.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPCacheFactory.java
@@ -390,7 +390,9 @@
// get dereferenced, also we don't want one for every region.
discovery = UDPDiscoveryManager.getInstance().getService( lac.getUdpDiscoveryAddr(),
lac.getUdpDiscoveryPort(),
- lac.getTcpListenerPort(), cacheMgr);
+ lac.getTcpListenerPort(),
+ cacheMgr,
+ elementSerializer);
discovery.addParticipatingCacheName( lac.getCacheName() );
discovery.addDiscoveryListener( discoveryListener );
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/CacheGroup.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/CacheGroup.java
index 64fb357..0c79706 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/CacheGroup.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/CacheGroup.java
@@ -22,9 +22,10 @@
import org.apache.commons.jcs3.engine.behavior.IElementAttributes;
/**
- * Holder for attributes specific to a group. The grouping functionality is on
- * the way out.
+ * Holder for attributes specific to a group.
+ * @deprecated The grouping functionality is on the way out.
*/
+@Deprecated
public class CacheGroup
{
/** Element configuration. */
@@ -47,9 +48,9 @@
}
/**
- * Gets the attrributes attribute of the CacheGroup object
+ * Gets the attributes attribute of the CacheGroup object
* <p>
- * @return The attrributes value
+ * @return The attributes value
*/
public IElementAttributes getElementAttrributes()
{
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/control/group/GroupAttrName.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/control/group/GroupAttrName.java
index fc194fe..dd150a2 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/control/group/GroupAttrName.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/control/group/GroupAttrName.java
@@ -20,6 +20,7 @@
*/
import java.io.Serializable;
+import java.util.Objects;
/**
* Description of the Class
@@ -36,9 +37,6 @@
/** the name of the attribute */
public final T attrName;
- /** Cached toString value */
- private String toString;
-
/**
* Constructor for the GroupAttrName object
* @param groupId
@@ -71,16 +69,7 @@
if (groupId.equals( to.groupId ))
{
- if (attrName == null && to.attrName == null)
- {
- return true;
- }
- else if (attrName == null || to.attrName == null)
- {
- return false;
- }
-
- return attrName.equals( to.attrName );
+ return Objects.equals(attrName, to.attrName);
}
return false;
@@ -92,12 +81,7 @@
@Override
public int hashCode()
{
- if (attrName == null)
- {
- return groupId.hashCode();
- }
-
- return groupId.hashCode() ^ attrName.hashCode();
+ return Objects.hash(groupId, attrName);
}
/**
@@ -106,12 +90,7 @@
@Override
public String toString()
{
- if ( toString == null )
- {
- toString = "[GAN: groupId=" + groupId + ", attrName=" + attrName + "]";
- }
-
- return toString;
+ return String.format("GAN:%s:%s", groupId, Objects.toString(attrName, ""));
}
}
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/control/group/GroupId.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/control/group/GroupId.java
index 86f90b6..fa4bee4 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/control/group/GroupId.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/control/group/GroupId.java
@@ -20,6 +20,7 @@
*/
import java.io.Serializable;
+import java.util.Objects;
/**
* Used to avoid name conflict when group cache items are mixed with non-group cache items in the
@@ -37,9 +38,6 @@
/** the name of the region. */
public final String cacheName;
- /** Cached toString value. */
- private String toString;
-
/**
* Constructor for the GroupId object
* <p>
@@ -77,27 +75,22 @@
}
/**
- * @return cacheName.hashCode() + groupName.hashCode();
+ * @return Objects.hash(cacheName, groupName);
*/
@Override
public int hashCode()
{
- return cacheName.hashCode() + groupName.hashCode();
+ return Objects.hash(cacheName, groupName);
}
/**
- * Caches the value.
- * <p>
- * @return debugging string.
+ * Convert to string
+ *
+ * @return the string representation of this ID.
*/
@Override
public String toString()
{
- if ( toString == null )
- {
- toString = "[groupId=" + cacheName + ", " + groupName + ']';
- }
-
- return toString;
+ return String.format("[groupId=%s, %s]", cacheName, groupName);
}
}
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryManager.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryManager.java
index 1d51838..f07f4de 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryManager.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryManager.java
@@ -23,9 +23,11 @@
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.jcs3.engine.behavior.ICompositeCacheManager;
+import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
import org.apache.commons.jcs3.engine.behavior.IProvideScheduler;
import org.apache.commons.jcs3.log.Log;
import org.apache.commons.jcs3.log.LogManager;
+import org.apache.commons.jcs3.utils.serialization.StandardSerializer;
/**
* This manages UDPDiscovery Services. We should end up with one service per Lateral Cache Manager
@@ -60,6 +62,7 @@
return INSTANCE;
}
+
/**
* Creates a service for the address and port if one doesn't exist already.
* <p>
@@ -71,11 +74,33 @@
* @param servicePort
* @param cacheMgr
* @return UDPDiscoveryService
+ * @deprecated Specify serializer implementation explicitly
*/
+ @Deprecated
public UDPDiscoveryService getService( final String discoveryAddress, final int discoveryPort, final int servicePort,
final ICompositeCacheManager cacheMgr )
{
- final String key = discoveryAddress + ":" + discoveryPort + ":" + servicePort;
+ return getService(discoveryAddress, discoveryPort, servicePort, cacheMgr, new StandardSerializer());
+ }
+
+ /**
+ * Creates a service for the address and port if one doesn't exist already.
+ * <p>
+ * We need to key this using the listener port too. TODO think of making one discovery service
+ * work for multiple types of clients.
+ * <p>
+ * @param discoveryAddress
+ * @param discoveryPort
+ * @param servicePort
+ * @param cacheMgr
+ * @param serializer
+ *
+ * @return UDPDiscoveryService
+ */
+ public UDPDiscoveryService getService( final String discoveryAddress, final int discoveryPort,
+ final int servicePort, final ICompositeCacheManager cacheMgr, final IElementSerializer serializer )
+ {
+ final String key = String.join(":", discoveryAddress, String.valueOf(discoveryPort), String.valueOf(servicePort));
final UDPDiscoveryService service = services.computeIfAbsent(key, k -> {
log.info( "Creating service for address:port:servicePort [{0}]", key );
@@ -85,7 +110,7 @@
attributes.setUdpDiscoveryPort( discoveryPort );
attributes.setServicePort( servicePort );
- final UDPDiscoveryService newService = new UDPDiscoveryService( attributes );
+ final UDPDiscoveryService newService = new UDPDiscoveryService(attributes, serializer);
// register for shutdown notification
cacheMgr.registerShutdownObserver( newService );
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryReceiver.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryReceiver.java
index e269301..60b872c 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryReceiver.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryReceiver.java
@@ -19,9 +19,7 @@
* under the License.
*/
-import java.io.ByteArrayInputStream;
import java.io.IOException;
-import java.io.ObjectInputStream;
import java.net.DatagramPacket;
import java.net.InetAddress;
import java.net.MulticastSocket;
@@ -32,7 +30,6 @@
import org.apache.commons.jcs3.engine.CacheInfo;
import org.apache.commons.jcs3.engine.behavior.IShutdownObserver;
-import org.apache.commons.jcs3.io.ObjectInputStreamClassLoaderAware;
import org.apache.commons.jcs3.log.Log;
import org.apache.commons.jcs3.log.LogManager;
import org.apache.commons.jcs3.utils.net.HostNameUtil;
@@ -47,9 +44,6 @@
/** The log factory */
private static final Log log = LogManager.getLog( UDPDiscoveryReceiver.class );
- /** buffer */
- private final byte[] mBuffer = new byte[65536];
-
/** The socket used for communication. */
private MulticastSocket mSocket;
@@ -159,7 +153,8 @@
public Object waitForMessage()
throws IOException
{
- final DatagramPacket packet = new DatagramPacket( mBuffer, mBuffer.length );
+ final byte[] mBuffer = new byte[65536];
+ final DatagramPacket packet = new DatagramPacket(mBuffer, mBuffer.length);
Object obj = null;
try
{
@@ -170,11 +165,7 @@
log.debug( "Received packet from address [{0}]",
() -> packet.getSocketAddress() );
- try (ByteArrayInputStream byteStream = new ByteArrayInputStream(mBuffer, 0, packet.getLength());
- ObjectInputStream objectStream = new ObjectInputStreamClassLoaderAware(byteStream, null))
- {
- obj = objectStream.readObject();
- }
+ obj = service.getSerializer().deSerialize(mBuffer, null);
if ( obj instanceof UDPDiscoveryMessage )
{
@@ -188,7 +179,7 @@
packet.getSocketAddress(), obj );
}
}
- catch ( final Exception e )
+ catch ( final IOException | ClassNotFoundException e )
{
log.error( "Error receiving multicast packet", e );
}
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoverySender.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoverySender.java
index b153401..b35be3c 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoverySender.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoverySender.java
@@ -27,6 +27,7 @@
import java.util.ArrayList;
import org.apache.commons.jcs3.engine.CacheInfo;
+import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
import org.apache.commons.jcs3.log.Log;
import org.apache.commons.jcs3.log.LogManager;
import org.apache.commons.jcs3.utils.discovery.UDPDiscoveryMessage.BroadcastType;
@@ -52,7 +53,7 @@
private final int multicastPort;
/** Used to serialize messages */
- private final StandardSerializer serializer = new StandardSerializer();
+ private final IElementSerializer serializer;
/**
* Constructor for the UDPDiscoverySender object
@@ -65,10 +66,31 @@
* @param port
* @param udpTTL the Datagram packet time-to-live
* @throws IOException
+ * @deprecated Specify serializer implementation explicitly
*/
+ @Deprecated
public UDPDiscoverySender( final String host, final int port, final int udpTTL )
throws IOException
{
+ this(host, port, udpTTL, new StandardSerializer());
+ }
+
+ /**
+ * Constructor for the UDPDiscoverySender object
+ * <p>
+ * This sender can be used to send multiple messages.
+ * <p>
+ * When you are done sending, you should destroy the socket sender.
+ * <p>
+ * @param host
+ * @param port
+ * @param udpTTL the Datagram packet time-to-live
+ * @param serializer the Serializer to use when sending messages
+ * @throws IOException
+ */
+ public UDPDiscoverySender( final String host, final int port, final int udpTTL, IElementSerializer serializer)
+ throws IOException
+ {
try
{
log.debug( "Constructing socket for sender on port [{0}]", port );
@@ -89,6 +111,7 @@
}
this.multicastPort = port;
+ this.serializer = serializer;
}
/**
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryService.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryService.java
index bbe4b4e..8d08463 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryService.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryService.java
@@ -32,12 +32,14 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
import org.apache.commons.jcs3.engine.behavior.IRequireScheduler;
import org.apache.commons.jcs3.engine.behavior.IShutdownObserver;
import org.apache.commons.jcs3.log.Log;
import org.apache.commons.jcs3.log.LogManager;
import org.apache.commons.jcs3.utils.discovery.behavior.IDiscoveryListener;
import org.apache.commons.jcs3.utils.net.HostNameUtil;
+import org.apache.commons.jcs3.utils.serialization.StandardSerializer;
/**
* This service creates a listener that can create lateral caches and add them to the no wait list.
@@ -63,8 +65,11 @@
/** attributes */
private UDPDiscoveryAttributes udpDiscoveryAttributes;
+ /** Used to serialize messages */
+ private final IElementSerializer serializer;
+
/** is this shut down? */
- private AtomicBoolean shutdown = new AtomicBoolean(false);
+ private final AtomicBoolean shutdown = new AtomicBoolean(false);
/** This is a set of services that have been discovered. */
private final ConcurrentMap<Integer, DiscoveredService> discoveredServices = new ConcurrentHashMap<>();
@@ -82,9 +87,24 @@
private ScheduledFuture<?> cleanupTaskFuture = null;
/**
- * @param attributes
+ * Constructor
+ *
+ * @param attributes settings of the service
+ * @deprecated Specify serializer implementation explicitly
*/
- public UDPDiscoveryService( final UDPDiscoveryAttributes attributes)
+ @Deprecated
+ public UDPDiscoveryService(final UDPDiscoveryAttributes attributes)
+ {
+ this(attributes, new StandardSerializer());
+ }
+
+ /**
+ * Constructor
+ *
+ * @param attributes settings of service
+ * @param serializer the serializer to use to send and receive messages
+ */
+ public UDPDiscoveryService(final UDPDiscoveryAttributes attributes, IElementSerializer serializer)
{
udpDiscoveryAttributes = attributes.clone();
@@ -114,6 +134,8 @@
getUdpDiscoveryAttributes().getUdpDiscoveryPort(), e );
}
+ this.serializer = serializer;
+
// initiate sender broadcast
initiateBroadcast();
}
@@ -177,7 +199,8 @@
try (UDPDiscoverySender sender = new UDPDiscoverySender(
getUdpDiscoveryAttributes().getUdpDiscoveryAddr(),
getUdpDiscoveryAttributes().getUdpDiscoveryPort(),
- getUdpDiscoveryAttributes().getUdpTTL()))
+ getUdpDiscoveryAttributes().getUdpTTL(),
+ getSerializer()))
{
sender.requestBroadcast();
@@ -202,7 +225,8 @@
try (UDPDiscoverySender sender = new UDPDiscoverySender(
getUdpDiscoveryAttributes().getUdpDiscoveryAddr(),
getUdpDiscoveryAttributes().getUdpDiscoveryPort(),
- getUdpDiscoveryAttributes().getUdpTTL()))
+ getUdpDiscoveryAttributes().getUdpTTL(),
+ getSerializer()))
{
sender.passiveBroadcast(
getUdpDiscoveryAttributes().getServiceAddress(),
@@ -230,7 +254,8 @@
try (UDPDiscoverySender sender = new UDPDiscoverySender(
getUdpDiscoveryAttributes().getUdpDiscoveryAddr(),
getUdpDiscoveryAttributes().getUdpDiscoveryPort(),
- getUdpDiscoveryAttributes().getUdpTTL()))
+ getUdpDiscoveryAttributes().getUdpTTL(),
+ getSerializer()))
{
sender.removeBroadcast(
getUdpDiscoveryAttributes().getServiceAddress(),
@@ -335,6 +360,16 @@
}
/**
+ * Return the serializer implementation
+ *
+ * @return the serializer
+ */
+ public IElementSerializer getSerializer()
+ {
+ return serializer;
+ }
+
+ /**
* Start necessary receiver thread
*/
public void startup()
diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/serialization/CompressingSerializer.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/serialization/CompressingSerializer.java
index 9043822..b9ffb80 100644
--- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/serialization/CompressingSerializer.java
+++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/utils/serialization/CompressingSerializer.java
@@ -21,6 +21,7 @@
import java.io.IOException;
+import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
import org.apache.commons.jcs3.utils.zip.CompressionUtil;
/**
@@ -28,6 +29,28 @@
*/
public class CompressingSerializer extends StandardSerializer
{
+ /** Wrapped serializer */
+ private final IElementSerializer serializer;
+
+
+ /**
+ * Default constructor
+ */
+ public CompressingSerializer()
+ {
+ this(new StandardSerializer());
+ }
+
+ /**
+ * Wrapper constructor
+ *
+ * @param serializer the wrapped serializer
+ */
+ public CompressingSerializer(IElementSerializer serializer)
+ {
+ this.serializer = serializer;
+ }
+
/**
* Serializes an object using default serialization. Compresses the byte array.
* <p>
@@ -39,7 +62,7 @@
public <T> byte[] serialize( final T obj )
throws IOException
{
- final byte[] uncompressed = super.serialize(obj);
+ final byte[] uncompressed = serializer.serialize(obj);
final byte[] compressed = CompressionUtil.compressByteArray( uncompressed );
return compressed;
}
@@ -64,6 +87,6 @@
}
final byte[] decompressedByteArray = CompressionUtil.decompressByteArray( data );
- return super.deSerialize(decompressedByteArray, loader);
+ return serializer.deSerialize(decompressedByteArray, loader);
}
}