package org.apache.jcs.auxiliary.remote; | |
/* | |
* Licensed to the Apache Software Foundation (ASF) under one | |
* or more contributor license agreements. See the NOTICE file | |
* distributed with this work for additional information | |
* regarding copyright ownership. The ASF licenses this file | |
* to you under the Apache License, Version 2.0 (the | |
* "License"); you may not use this file except in compliance | |
* with the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, | |
* software distributed under the License is distributed on an | |
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
* KIND, either express or implied. See the License for the | |
* specific language governing permissions and limitations | |
* under the License. | |
*/ | |
import java.io.IOException; | |
import java.io.Serializable; | |
import java.lang.reflect.InvocationTargetException; | |
import java.util.ArrayList; | |
import java.util.HashMap; | |
import java.util.Iterator; | |
import java.util.Map; | |
import java.util.Set; | |
import org.apache.commons.logging.Log; | |
import org.apache.commons.logging.LogFactory; | |
import org.apache.jcs.auxiliary.AbstractAuxiliaryCacheEventLogging; | |
import org.apache.jcs.auxiliary.AuxiliaryCacheAttributes; | |
import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheAttributes; | |
import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheClient; | |
import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheListener; | |
import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheService; | |
import org.apache.jcs.engine.CacheConstants; | |
import org.apache.jcs.engine.behavior.ICacheElement; | |
import org.apache.jcs.engine.behavior.ICacheElementSerialized; | |
import org.apache.jcs.engine.behavior.IZombie; | |
import org.apache.jcs.engine.logging.behavior.ICacheEventLogger; | |
import org.apache.jcs.engine.stats.StatElement; | |
import org.apache.jcs.engine.stats.Stats; | |
import org.apache.jcs.engine.stats.behavior.IStatElement; | |
import org.apache.jcs.engine.stats.behavior.IStats; | |
import org.apache.jcs.utils.serialization.SerializationConversionUtil; | |
import org.apache.jcs.utils.threadpool.ThreadPool; | |
import org.apache.jcs.utils.threadpool.ThreadPoolManager; | |
import EDU.oswego.cs.dl.util.concurrent.Callable; | |
import EDU.oswego.cs.dl.util.concurrent.FutureResult; | |
import EDU.oswego.cs.dl.util.concurrent.TimeoutException; | |
/** Abstract base for remote caches. I'm trying to break out and reuse common functionality. */ | |
public abstract class AbstractRemoteAuxiliaryCache | |
extends AbstractAuxiliaryCacheEventLogging | |
implements IRemoteCacheClient | |
{ | |
/** Don't change. */ | |
private static final long serialVersionUID = -5329231850422826461L; | |
/** The logger. */ | |
private final static Log log = LogFactory.getLog( AbstractRemoteAuxiliaryCache.class ); | |
/** | |
* This does the work. In an RMI instances, it will be a remote reference. In an http remote | |
* cache it will be an http client. In zombie mode it is replaced with a balking facade. | |
*/ | |
private IRemoteCacheService remoteCacheService; | |
/** The cacheName */ | |
protected final String cacheName; | |
/** The listener. This can be null. */ | |
private IRemoteCacheListener remoteCacheListener; | |
/** The configuration values. TODO, we'll need a base here. */ | |
private IRemoteCacheAttributes remoteCacheAttributes; | |
/** A thread pool for gets if configured. */ | |
private ThreadPool pool = null; | |
/** Should we get asynchronously using a pool. */ | |
private boolean usePoolForGet = false; | |
/** | |
* Creates the base. | |
* <p> | |
* @param cattr | |
* @param remote | |
* @param listener | |
*/ | |
public AbstractRemoteAuxiliaryCache( IRemoteCacheAttributes cattr, IRemoteCacheService remote, | |
IRemoteCacheListener listener ) | |
{ | |
this.setRemoteCacheAttributes( cattr ); | |
this.cacheName = cattr.getCacheName(); | |
this.setRemoteCacheService( remote ); | |
this.setRemoteCacheListener( listener ); | |
if ( log.isDebugEnabled() ) | |
{ | |
log.debug( "Construct> cacheName=" + cattr.getCacheName() ); | |
log.debug( "irca = " + getRemoteCacheAttributes() ); | |
log.debug( "remote = " + remote ); | |
log.debug( "listener = " + listener ); | |
} | |
// use a pool if it is greater than 0 | |
if ( log.isDebugEnabled() ) | |
{ | |
log.debug( "GetTimeoutMillis() = " + getRemoteCacheAttributes().getGetTimeoutMillis() ); | |
} | |
if ( getRemoteCacheAttributes().getGetTimeoutMillis() > 0 ) | |
{ | |
pool = ThreadPoolManager.getInstance().getPool( getRemoteCacheAttributes().getThreadPoolName() ); | |
if ( log.isDebugEnabled() ) | |
{ | |
log.debug( "Thread Pool = " + pool ); | |
} | |
if ( pool != null ) | |
{ | |
usePoolForGet = true; | |
} | |
} | |
} | |
/** | |
* Synchronously dispose the remote cache; if failed, replace the remote handle with a zombie. | |
* <p> | |
* @throws IOException | |
*/ | |
protected void processDispose() | |
throws IOException | |
{ | |
if ( log.isInfoEnabled() ) | |
{ | |
log.info( "Disposing of remote cache." ); | |
} | |
try | |
{ | |
if ( getRemoteCacheListener() != null ) | |
{ | |
getRemoteCacheListener().dispose(); | |
} | |
} | |
catch ( Exception ex ) | |
{ | |
log.error( "Couldn't dispose", ex ); | |
handleException( ex, "Failed to dispose [" + cacheName + "]", ICacheEventLogger.DISPOSE_EVENT ); | |
} | |
} | |
/** | |
* Synchronously get from the remote cache; if failed, replace the remote handle with a zombie. | |
* <p> | |
* Use threadpool to timeout if a value is set for GetTimeoutMillis | |
* <p> | |
* If we are a cluster client, we need to leave the Element in its serialized form. Cluster | |
* clients cannot deserialize objects. Cluster clients get ICacheElementSerialized objects from | |
* other remote servers. | |
* <p> | |
* @param key | |
* @return ICacheElement, a wrapper around the key, value, and attributes | |
* @throws IOException | |
*/ | |
protected ICacheElement processGet( Serializable key ) | |
throws IOException | |
{ | |
ICacheElement retVal = null; | |
try | |
{ | |
if ( usePoolForGet ) | |
{ | |
retVal = getUsingPool( key ); | |
} | |
else | |
{ | |
retVal = getRemoteCacheService().get( cacheName, key, getListenerId() ); | |
} | |
// Eventually the instance of will not be necessary. | |
if ( retVal != null && retVal instanceof ICacheElementSerialized ) | |
{ | |
// Never try to deserialize if you are a cluster client. Cluster | |
// clients are merely intra-remote cache communicators. Remote caches are assumed | |
// to have no ability to deserialze the objects. | |
if ( this.getRemoteCacheAttributes().getRemoteType() != IRemoteCacheAttributes.CLUSTER ) | |
{ | |
retVal = SerializationConversionUtil.getDeSerializedCacheElement( (ICacheElementSerialized) retVal, | |
this.elementSerializer ); | |
} | |
} | |
} | |
catch ( Exception ex ) | |
{ | |
handleException( ex, "Failed to get [" + key + "] from [" + cacheName + "]", ICacheEventLogger.GET_EVENT ); | |
} | |
return retVal; | |
} | |
/** | |
* This allows gets to timeout in case of remote server machine shutdown. | |
* <p> | |
* @param key | |
* @return ICacheElement | |
* @throws IOException | |
*/ | |
public ICacheElement getUsingPool( final Serializable key ) | |
throws IOException | |
{ | |
int timeout = getRemoteCacheAttributes().getGetTimeoutMillis(); | |
try | |
{ | |
FutureResult future = new FutureResult(); | |
Runnable command = future.setter( new Callable() | |
{ | |
public Object call() | |
throws IOException | |
{ | |
return getRemoteCacheService().get( cacheName, key, getListenerId() ); | |
} | |
} ); | |
// execute using the pool | |
pool.execute( command ); | |
// used timed get in order to timeout | |
ICacheElement ice = (ICacheElement) future.timedGet( timeout ); | |
if ( log.isDebugEnabled() ) | |
{ | |
if ( ice == null ) | |
{ | |
log.debug( "nothing found in remote cache" ); | |
} | |
else | |
{ | |
log.debug( "found item in remote cache" ); | |
} | |
} | |
return ice; | |
} | |
catch ( TimeoutException te ) | |
{ | |
log.warn( "TimeoutException, Get Request timed out after " + timeout ); | |
throw new IOException( "Get Request timed out after " + timeout ); | |
} | |
catch ( InterruptedException ex ) | |
{ | |
log.warn( "InterruptedException, Get Request timed out after " + timeout ); | |
throw new IOException( "Get Request timed out after " + timeout ); | |
} | |
catch ( InvocationTargetException ex ) | |
{ | |
// assume that this is an IOException thrown by the callable. | |
log.error( "InvocationTargetException, Assuming an IO exception thrown in the background.", ex ); | |
throw new IOException( "Get Request timed out after " + timeout ); | |
} | |
} | |
/** | |
* Calls get matching on the server. Each entry in the result is unwrapped. | |
* <p> | |
* @param pattern | |
* @return Map | |
* @throws IOException | |
*/ | |
public Map processGetMatching( String pattern ) | |
throws IOException | |
{ | |
Map results = new HashMap(); | |
try | |
{ | |
Map rawResults = getRemoteCacheService().getMatching( cacheName, pattern, getListenerId() ); | |
// Eventually the instance of will not be necessary. | |
if ( rawResults != null ) | |
{ | |
Set entrySet = rawResults.entrySet(); | |
Iterator it = entrySet.iterator(); | |
while ( it.hasNext() ) | |
{ | |
Map.Entry entry = (Map.Entry) it.next(); | |
ICacheElement unwrappedResult = null; | |
if ( entry.getValue() instanceof ICacheElementSerialized ) | |
{ | |
// Never try to deserialize if you are a cluster client. Cluster | |
// clients are merely intra-remote cache communicators. Remote caches are assumed | |
// to have no ability to deserialze the objects. | |
if ( this.getRemoteCacheAttributes().getRemoteType() != IRemoteCacheAttributes.CLUSTER ) | |
{ | |
unwrappedResult = SerializationConversionUtil | |
.getDeSerializedCacheElement( (ICacheElementSerialized) entry.getValue(), | |
this.elementSerializer ); | |
} | |
} | |
else | |
{ | |
unwrappedResult = (ICacheElement) entry.getValue(); | |
} | |
results.put( entry.getKey(), unwrappedResult ); | |
} | |
} | |
} | |
catch ( Exception ex ) | |
{ | |
handleException( ex, "Failed to getMatching [" + pattern + "] from [" + cacheName + "]", | |
ICacheEventLogger.GET_EVENT ); | |
} | |
return results; | |
} | |
/** | |
* Gets multiple items from the cache based on the given set of keys. | |
* <p> | |
* @param keys | |
* @return a map of Serializable key to ICacheElement element, or an empty map if there is no | |
* data in cache for any of these keys | |
* @throws IOException | |
*/ | |
protected Map processGetMultiple( Set keys ) | |
throws IOException | |
{ | |
Map elements = new HashMap(); | |
if ( keys != null && !keys.isEmpty() ) | |
{ | |
Iterator iterator = keys.iterator(); | |
while ( iterator.hasNext() ) | |
{ | |
Serializable key = (Serializable) iterator.next(); | |
ICacheElement element = get( key ); | |
if ( element != null ) | |
{ | |
elements.put( key, element ); | |
} | |
} | |
} | |
return elements; | |
} | |
/** | |
* Synchronously remove from the remote cache; if failed, replace the remote handle with a | |
* zombie. | |
* <p> | |
* @param key | |
* @return boolean, whether or not the item was removed | |
* @throws IOException | |
*/ | |
protected boolean processRemove( Serializable key ) | |
throws IOException | |
{ | |
if ( !this.getRemoteCacheAttributes().getGetOnly() ) | |
{ | |
if ( log.isDebugEnabled() ) | |
{ | |
log.debug( "remove> key=" + key ); | |
} | |
try | |
{ | |
getRemoteCacheService().remove( cacheName, key, getListenerId() ); | |
} | |
catch ( Exception ex ) | |
{ | |
handleException( ex, "Failed to remove " + key + " from " + cacheName, ICacheEventLogger.REMOVE_EVENT ); | |
} | |
return true; | |
} | |
return false; | |
} | |
/** | |
* Synchronously removeAll from the remote cache; if failed, replace the remote handle with a | |
* zombie. | |
* <p> | |
* @throws IOException | |
*/ | |
protected void processRemoveAll() | |
throws IOException | |
{ | |
if ( !this.getRemoteCacheAttributes().getGetOnly() ) | |
{ | |
try | |
{ | |
getRemoteCacheService().removeAll( cacheName, getListenerId() ); | |
} | |
catch ( Exception ex ) | |
{ | |
handleException( ex, "Failed to remove all from " + cacheName, ICacheEventLogger.REMOVEALL_EVENT ); | |
} | |
} | |
} | |
/** | |
* Serializes the object and then calls update on the remote server with the byte array. The | |
* byte array is wrapped in a ICacheElementSerialized. This allows the remote server to operate | |
* without any knowledge of caches classes. | |
* <p> | |
* @param ce | |
* @throws IOException | |
*/ | |
protected void processUpdate( ICacheElement ce ) | |
throws IOException | |
{ | |
if ( !getRemoteCacheAttributes().getGetOnly() ) | |
{ | |
ICacheElementSerialized serialized = null; | |
try | |
{ | |
if ( log.isDebugEnabled() ) | |
{ | |
log.debug( "sending item to remote server" ); | |
} | |
// convert so we don't have to know about the object on the | |
// other end. | |
serialized = SerializationConversionUtil.getSerializedCacheElement( ce, this.elementSerializer ); | |
remoteCacheService.update( serialized, getListenerId() ); | |
} | |
catch ( NullPointerException npe ) | |
{ | |
log.error( "npe for ce = " + ce + "ce.attr = " + ce.getElementAttributes(), npe ); | |
} | |
catch ( Exception ex ) | |
{ | |
// event queue will wait and retry | |
handleException( ex, "Failed to put [" + ce.getKey() + "] to " + ce.getCacheName(), | |
ICacheEventLogger.UPDATE_EVENT ); | |
} | |
} | |
else | |
{ | |
if ( log.isDebugEnabled() ) | |
{ | |
log.debug( "get only mode, not sending to remote server" ); | |
} | |
} | |
} | |
/** | |
* Returns all the keys for a group. | |
* <p> | |
* @param groupName | |
* @return Set | |
* @throws java.rmi.RemoteException | |
* @throws IOException | |
*/ | |
public Set getGroupKeys( String groupName ) | |
throws java.rmi.RemoteException, IOException | |
{ | |
return getRemoteCacheService().getGroupKeys( cacheName, groupName ); | |
} | |
/** | |
* Allows other member of this package to access the listerner. This is mainly needed for | |
* deregistering a listener. | |
* <p> | |
* @return IRemoteCacheListener, the listener for this remote server | |
*/ | |
public IRemoteCacheListener getListener() | |
{ | |
return getRemoteCacheListener(); | |
} | |
/** | |
* let the remote cache set a listener_id. Since there is only one listener for all the regions | |
* and every region gets registered? the id shouldn't be set if it isn't zero. If it is we | |
* assume that it is a reconnect. | |
* <p> | |
* @param id The new listenerId value | |
*/ | |
public void setListenerId( long id ) | |
{ | |
if ( getRemoteCacheListener() != null ) | |
{ | |
try | |
{ | |
getRemoteCacheListener().setListenerId( id ); | |
if ( log.isDebugEnabled() ) | |
{ | |
log.debug( "set listenerId = " + id ); | |
} | |
} | |
catch ( Exception e ) | |
{ | |
log.error( "Problem setting listenerId", e ); | |
} | |
} | |
} | |
/** | |
* Gets the listenerId attribute of the RemoteCacheListener object | |
* <p> | |
* @return The listenerId value | |
*/ | |
public long getListenerId() | |
{ | |
if ( getRemoteCacheListener() != null ) | |
{ | |
try | |
{ | |
if ( log.isDebugEnabled() ) | |
{ | |
log.debug( "get listenerId = " + getRemoteCacheListener().getListenerId() ); | |
} | |
return getRemoteCacheListener().getListenerId(); | |
} | |
catch ( Exception e ) | |
{ | |
log.error( "Problem getting listenerId", e ); | |
} | |
} | |
return -1; | |
} | |
/** | |
* Returns the current cache size. | |
* @return The size value | |
*/ | |
public int getSize() | |
{ | |
return 0; | |
} | |
/** | |
* Custom exception handling some children. This should be used to initiate failover. | |
* <p> | |
* @param ex | |
* @param msg | |
* @param eventName | |
* @throws IOException | |
*/ | |
protected abstract void handleException( Exception ex, String msg, String eventName ) | |
throws IOException; | |
/** | |
* Gets the stats attribute of the RemoteCache object. | |
* <p> | |
* @return The stats value | |
*/ | |
public String getStats() | |
{ | |
return getStatistics().toString(); | |
} | |
/** | |
* @return IStats object | |
*/ | |
public IStats getStatistics() | |
{ | |
IStats stats = new Stats(); | |
stats.setTypeName( "AbstractRemoteAuxiliaryCache" ); | |
ArrayList elems = new ArrayList(); | |
IStatElement se = null; | |
se = new StatElement(); | |
se.setName( "Remote Type" ); | |
se.setData( this.getRemoteCacheAttributes().getRemoteTypeName() + "" ); | |
elems.add( se ); | |
if ( this.getRemoteCacheAttributes().getRemoteType() == IRemoteCacheAttributes.CLUSTER ) | |
{ | |
// something cluster specific | |
} | |
// no data gathered here | |
se = new StatElement(); | |
se.setName( "UsePoolForGet" ); | |
se.setData( "" + usePoolForGet ); | |
elems.add( se ); | |
if ( pool != null ) | |
{ | |
se = new StatElement(); | |
se.setName( "Pool Size" ); | |
se.setData( "" + pool.getPool().getPoolSize() ); | |
elems.add( se ); | |
se = new StatElement(); | |
se.setName( "Maximum Pool Size" ); | |
se.setData( "" + pool.getPool().getMaximumPoolSize() ); | |
elems.add( se ); | |
} | |
if ( getRemoteCacheService() instanceof ZombieRemoteCacheService ) | |
{ | |
se = new StatElement(); | |
se.setName( "Zombie Queue Size" ); | |
se.setData( "" + ( (ZombieRemoteCacheService) getRemoteCacheService() ).getQueueSize() ); | |
elems.add( se ); | |
} | |
// get an array and put them in the Stats object | |
IStatElement[] ses = (IStatElement[]) elems.toArray( new StatElement[0] ); | |
stats.setStatElements( ses ); | |
return stats; | |
} | |
/** | |
* Returns the cache status. An error status indicates the remote connection is not available. | |
* <p> | |
* @return The status value | |
*/ | |
public int getStatus() | |
{ | |
return getRemoteCacheService() instanceof IZombie ? CacheConstants.STATUS_ERROR : CacheConstants.STATUS_ALIVE; | |
} | |
/** | |
* Replaces the current remote cache service handle with the given handle. If the current remote | |
* is a Zombie, then it propagates any events that are queued to the restored service. | |
* <p> | |
* @param restoredRemote IRemoteCacheService -- the remote server or proxy to the remote server | |
*/ | |
public void fixCache( IRemoteCacheService restoredRemote ) | |
{ | |
if ( getRemoteCacheService() != null && getRemoteCacheService() instanceof ZombieRemoteCacheService ) | |
{ | |
ZombieRemoteCacheService zombie = (ZombieRemoteCacheService) getRemoteCacheService(); | |
setRemoteCacheService( restoredRemote ); | |
try | |
{ | |
zombie.propagateEvents( restoredRemote ); | |
} | |
catch ( Exception e ) | |
{ | |
try | |
{ | |
handleException( e, "Problem propagating events from Zombie Queue to new Remote Service.", | |
"fixCache" ); | |
} | |
catch ( IOException e1 ) | |
{ | |
// swallow, since this is just expected kick back. Handle always throws | |
} | |
} | |
} | |
else | |
{ | |
setRemoteCacheService( restoredRemote ); | |
} | |
return; | |
} | |
/** | |
* Gets the cacheType attribute of the RemoteCache object | |
* @return The cacheType value | |
*/ | |
public int getCacheType() | |
{ | |
return REMOTE_CACHE; | |
} | |
/** | |
* Gets the cacheName attribute of the RemoteCache object. | |
* <p> | |
* @return The cacheName value | |
*/ | |
public String getCacheName() | |
{ | |
return cacheName; | |
} | |
/** | |
* @param remote the remote to set | |
*/ | |
protected void setRemoteCacheService( IRemoteCacheService remote ) | |
{ | |
this.remoteCacheService = remote; | |
} | |
/** | |
* @return the remote | |
*/ | |
protected IRemoteCacheService getRemoteCacheService() | |
{ | |
return remoteCacheService; | |
} | |
/** | |
* @return Returns the AuxiliaryCacheAttributes. | |
*/ | |
public AuxiliaryCacheAttributes getAuxiliaryCacheAttributes() | |
{ | |
return getRemoteCacheAttributes(); | |
} | |
/** | |
* @param remoteCacheAttributes the remoteCacheAttributes to set | |
*/ | |
protected void setRemoteCacheAttributes( IRemoteCacheAttributes remoteCacheAttributes ) | |
{ | |
this.remoteCacheAttributes = remoteCacheAttributes; | |
} | |
/** | |
* @return the remoteCacheAttributes | |
*/ | |
protected IRemoteCacheAttributes getRemoteCacheAttributes() | |
{ | |
return remoteCacheAttributes; | |
} | |
/** | |
* @param remoteCacheListener the remoteCacheListener to set | |
*/ | |
protected void setRemoteCacheListener( IRemoteCacheListener remoteCacheListener ) | |
{ | |
this.remoteCacheListener = remoteCacheListener; | |
} | |
/** | |
* @return the remoteCacheListener | |
*/ | |
protected IRemoteCacheListener getRemoteCacheListener() | |
{ | |
return remoteCacheListener; | |
} | |
} |