blob: 84119e206f3ecc262d0b87410ad6808ef504c164 [file] [log] [blame]
package org.apache.commons.jcs3.auxiliary.remote;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.io.IOException;
import java.rmi.Naming;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.jcs3.auxiliary.remote.behavior.IRemoteCacheAttributes;
import org.apache.commons.jcs3.auxiliary.remote.behavior.IRemoteCacheClient;
import org.apache.commons.jcs3.auxiliary.remote.behavior.IRemoteCacheListener;
import org.apache.commons.jcs3.engine.CacheStatus;
import org.apache.commons.jcs3.engine.CacheWatchRepairable;
import org.apache.commons.jcs3.engine.ZombieCacheServiceNonLocal;
import org.apache.commons.jcs3.engine.ZombieCacheWatch;
import org.apache.commons.jcs3.engine.behavior.ICacheObserver;
import org.apache.commons.jcs3.engine.behavior.ICacheServiceNonLocal;
import org.apache.commons.jcs3.engine.behavior.ICompositeCacheManager;
import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
import org.apache.commons.jcs3.engine.logging.behavior.ICacheEventLogger;
import org.apache.commons.jcs3.log.Log;
import org.apache.commons.jcs3.log.LogManager;
/**
* An instance of RemoteCacheManager corresponds to one remote connection of a specific host and
* port. All RemoteCacheManager instances are monitored by the singleton RemoteCacheMonitor
* monitoring daemon for error detection and recovery.
* <p>
* Getting an instance of the remote cache has the effect of getting a handle on the remote server.
* Listeners are not registered with the server until a cache is requested from the manager.
*/
public class RemoteCacheManager
{
/** The logger */
private static final Log log = LogManager.getLog( RemoteCacheManager.class );
/** Contains instances of RemoteCacheNoWait managed by a RemoteCacheManager instance. */
private final ConcurrentMap<String, RemoteCacheNoWait<?, ?>> caches =
new ConcurrentHashMap<>();
/** The event logger. */
private final ICacheEventLogger cacheEventLogger;
/** The serializer. */
private final IElementSerializer elementSerializer;
/** Handle to the remote cache service; or a zombie handle if failed to connect. */
private ICacheServiceNonLocal<?, ?> remoteService;
/**
* Wrapper of the remote cache watch service; or wrapper of a zombie service if failed to
* connect.
*/
private final CacheWatchRepairable remoteWatch;
/** The cache manager listeners will need to use to get a cache. */
private final ICompositeCacheManager cacheMgr;
/** For error notification */
private final RemoteCacheMonitor monitor;
/** The service found through lookup */
private final String registry;
/** can it be restored */
private boolean canFix = true;
/**
* Constructs an instance to with the given remote connection parameters. If the connection
* cannot be made, "zombie" services will be temporarily used until a successful re-connection
* is made by the monitoring daemon.
* <p>
* @param cattr cache attributes
* @param cacheMgr the cache hub
* @param monitor the cache monitor thread for error notifications
* @param cacheEventLogger
* @param elementSerializer
*/
protected RemoteCacheManager( IRemoteCacheAttributes cattr, ICompositeCacheManager cacheMgr,
RemoteCacheMonitor monitor,
ICacheEventLogger cacheEventLogger, IElementSerializer elementSerializer)
{
this.cacheMgr = cacheMgr;
this.monitor = monitor;
this.cacheEventLogger = cacheEventLogger;
this.elementSerializer = elementSerializer;
this.remoteWatch = new CacheWatchRepairable();
this.registry = RemoteUtils.getNamingURL(cattr.getRemoteLocation(), cattr.getRemoteServiceName());
try
{
lookupRemoteService();
}
catch (IOException e)
{
log.error("Could not find server", e);
// Notify the cache monitor about the error, and kick off the
// recovery process.
monitor.notifyError();
}
}
/**
* Lookup remote service from registry
* @throws IOException if the remote service could not be found
*
*/
protected void lookupRemoteService() throws IOException
{
log.info( "Looking up server [{0}]", registry );
try
{
Object obj = Naming.lookup( registry );
log.info( "Server found: {0}", obj );
// Successful connection to the remote server.
this.remoteService = (ICacheServiceNonLocal<?, ?>) obj;
log.debug( "Remote Service = {0}", remoteService );
remoteWatch.setCacheWatch( (ICacheObserver) remoteService );
}
catch ( Exception ex )
{
// Failed to connect to the remote server.
// Configure this RemoteCacheManager instance to use the "zombie"
// services.
this.remoteService = new ZombieCacheServiceNonLocal<>();
remoteWatch.setCacheWatch( new ZombieCacheWatch() );
throw new IOException( "Problem finding server at [" + registry + "]", ex );
}
}
/**
* Adds the remote cache listener to the underlying cache-watch service.
* <p>
* @param cattr The feature to be added to the RemoteCacheListener attribute
* @param listener The feature to be added to the RemoteCacheListener attribute
* @throws IOException
*/
public <K, V> void addRemoteCacheListener( IRemoteCacheAttributes cattr, IRemoteCacheListener<K, V> listener )
throws IOException
{
if ( cattr.isReceive() )
{
log.info( "The remote cache is configured to receive events from the remote server. "
+ "We will register a listener. remoteWatch = {0} | IRemoteCacheListener = {1}"
+ " | cacheName ", remoteWatch, listener, cattr.getCacheName() );
remoteWatch.addCacheListener( cattr.getCacheName(), listener );
}
else
{
log.info( "The remote cache is configured to NOT receive events from the remote server. "
+ "We will NOT register a listener." );
}
}
/**
* Removes a listener. When the primary recovers the failover must deregister itself for a
* region. The failover runner will call this method to de-register. We do not want to deregister
* all listeners to a remote server, in case a failover is a primary of another region. Having
* one regions failover act as another servers primary is not currently supported.
* <p>
* @param cattr
* @throws IOException
*/
public void removeRemoteCacheListener( IRemoteCacheAttributes cattr )
throws IOException
{
RemoteCacheNoWait<?, ?> cache = caches.get( cattr.getCacheName() );
if ( cache != null )
{
removeListenerFromCache(cache);
}
else
{
if ( cattr.isReceive() )
{
log.warn( "Trying to deregister Cache Listener that was never registered." );
}
else
{
log.debug( "Since the remote cache is configured to not receive, "
+ "there is no listener to deregister." );
}
}
}
// common helper method
private void removeListenerFromCache(RemoteCacheNoWait<?, ?> cache) throws IOException
{
IRemoteCacheClient<?, ?> rc = cache.getRemoteCache();
log.debug( "Found cache for [{0}], deregistering listener.",
() -> cache.getCacheName() );
// could also store the listener for a server in the manager.
IRemoteCacheListener<?, ?> listener = rc.getListener();
remoteWatch.removeCacheListener( cache.getCacheName(), listener );
}
/**
* Gets a RemoteCacheNoWait from the RemoteCacheManager. The RemoteCacheNoWait objects are
* identified by the cache name value of the RemoteCacheAttributes object.
* <p>
* If the client is configured to register a listener, this call results on a listener being
* created if one isn't already registered with the remote cache for this region.
* <p>
* @param cattr
* @return The cache value
*/
@SuppressWarnings("unchecked") // Need to cast because of common map for all caches
public <K, V> RemoteCacheNoWait<K, V> getCache( IRemoteCacheAttributes cattr )
{
RemoteCacheNoWait<K, V> remoteCacheNoWait =
(RemoteCacheNoWait<K, V>) caches.computeIfAbsent(cattr.getCacheName(), key -> {
return newRemoteCacheNoWait(cattr);
});
// might want to do some listener sanity checking here.
return remoteCacheNoWait;
}
/**
* Create new RemoteCacheNoWait instance
*
* @param cattr the cache configuration
* @return the instance
*/
protected <K, V> RemoteCacheNoWait<K, V> newRemoteCacheNoWait(IRemoteCacheAttributes cattr)
{
RemoteCacheNoWait<K, V> remoteCacheNoWait;
// create a listener first and pass it to the remotecache
// sender.
RemoteCacheListener<K, V> listener = null;
try
{
listener = new RemoteCacheListener<>( cattr, cacheMgr, elementSerializer );
addRemoteCacheListener( cattr, listener );
}
catch ( Exception e )
{
log.error( "Problem adding listener. RemoteCacheListener = {0}",
listener, e );
}
IRemoteCacheClient<K, V> remoteCacheClient =
new RemoteCache<>( cattr, (ICacheServiceNonLocal<K, V>) remoteService, listener, monitor );
remoteCacheClient.setCacheEventLogger( cacheEventLogger );
remoteCacheClient.setElementSerializer( elementSerializer );
remoteCacheNoWait = new RemoteCacheNoWait<>( remoteCacheClient );
remoteCacheNoWait.setCacheEventLogger( cacheEventLogger );
remoteCacheNoWait.setElementSerializer( elementSerializer );
return remoteCacheNoWait;
}
/** Shutdown all. */
public void release()
{
for (RemoteCacheNoWait<?, ?> c : caches.values())
{
try
{
log.info( "freeCache [{0}]", () -> c.getCacheName() );
removeListenerFromCache(c);
c.dispose();
}
catch ( IOException ex )
{
log.error( "Problem releasing {0}", c.getCacheName(), ex );
}
}
caches.clear();
}
/**
* Fixes up all the caches managed by this cache manager.
*/
public void fixCaches()
{
if ( !canFix )
{
return;
}
log.info( "Fixing caches. ICacheServiceNonLocal {0} | IRemoteCacheObserver {1}",
remoteService, remoteWatch );
for (RemoteCacheNoWait<?, ?> c : caches.values())
{
if (c.getStatus() == CacheStatus.ERROR)
{
c.fixCache( remoteService );
}
}
if ( log.isInfoEnabled() )
{
String msg = "Remote connection to " + registry + " resumed.";
if ( cacheEventLogger != null )
{
cacheEventLogger.logApplicationEvent( "RemoteCacheManager", "fix", msg );
}
log.info( msg );
}
}
/**
* Returns true if the connection to the remote host can be
* successfully re-established.
* <p>
* @return true if we found a failover server
*/
public boolean canFixCaches()
{
try
{
lookupRemoteService();
}
catch (IOException e)
{
log.error("Could not find server", e);
canFix = false;
}
return canFix;
}
}