blob: 899ce2c4dbffe99d2bcfaa4138f98bdf148fb0d4 [file] [log] [blame]
package org.apache.commons.jcs3.auxiliary.lateral.socket.tcp;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.io.EOFException;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.jcs3.auxiliary.lateral.LateralElementDescriptor;
import org.apache.commons.jcs3.auxiliary.lateral.behavior.ILateralCacheListener;
import org.apache.commons.jcs3.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
import org.apache.commons.jcs3.engine.CacheInfo;
import org.apache.commons.jcs3.engine.behavior.ICacheElement;
import org.apache.commons.jcs3.engine.behavior.ICompositeCacheManager;
import org.apache.commons.jcs3.engine.behavior.IShutdownObserver;
import org.apache.commons.jcs3.engine.control.CompositeCache;
import org.apache.commons.jcs3.io.ObjectInputStreamClassLoaderAware;
import org.apache.commons.jcs3.log.Log;
import org.apache.commons.jcs3.log.LogManager;
import org.apache.commons.jcs3.utils.threadpool.DaemonThreadFactory;
/**
* Listens for connections from other TCP lateral caches and handles them. The initialization method
* starts a listening thread, which creates a socket server. When messages are received they are
* passed to a pooled executor which then calls the appropriate handle method.
*/
public class LateralTCPListener<K, V>
implements ILateralCacheListener<K, V>, IShutdownObserver
{
/** The logger */
private static final Log log = LogManager.getLog( LateralTCPListener.class );
/** How long the server will block on an accept(). 0 is infinite. */
private static final int acceptTimeOut = 1000;
/** The CacheHub this listener is associated with */
private transient ICompositeCacheManager cacheManager;
/** Map of available instances, keyed by port */
private static final ConcurrentHashMap<String, ILateralCacheListener<?, ?>> instances =
new ConcurrentHashMap<>();
/** The socket listener */
private ListenerThread receiver;
/** Configuration attributes */
private ITCPLateralCacheAttributes tcpLateralCacheAttributes;
/** The processor. We should probably use an event queue here. */
private ExecutorService pooledExecutor;
/** put count */
private int putCnt = 0;
/** remove count */
private int removeCnt = 0;
/** get count */
private int getCnt = 0;
/**
* Use the vmid by default. This can be set for testing. If we ever need to run more than one
* per vm, then we need a new technique.
*/
private long listenerId = CacheInfo.listenerId;
/** is this shut down? */
private AtomicBoolean shutdown;
/** is this terminated? */
private AtomicBoolean terminated;
/**
* Gets the instance attribute of the LateralCacheTCPListener class.
* <p>
* @param ilca ITCPLateralCacheAttributes
* @param cacheMgr
* @return The instance value
*/
public static <K, V> LateralTCPListener<K, V>
getInstance( final ITCPLateralCacheAttributes ilca, final ICompositeCacheManager cacheMgr )
{
@SuppressWarnings("unchecked") // Need to cast because of common map for all instances
final
LateralTCPListener<K, V> ins = (LateralTCPListener<K, V>) instances.computeIfAbsent(
String.valueOf( ilca.getTcpListenerPort() ),
k -> {
final LateralTCPListener<K, V> newIns = new LateralTCPListener<>( ilca );
newIns.init();
newIns.setCacheManager( cacheMgr );
log.info( "Created new listener {0}",
() -> ilca.getTcpListenerPort() );
return newIns;
});
return ins;
}
/**
* Only need one since it does work for all regions, just reference by multiple region names.
* <p>
* @param ilca
*/
protected LateralTCPListener( final ITCPLateralCacheAttributes ilca )
{
this.setTcpLateralCacheAttributes( ilca );
}
/**
* This starts the ListenerThread on the specified port.
*/
@Override
public synchronized void init()
{
try
{
final int port = getTcpLateralCacheAttributes().getTcpListenerPort();
final String host = getTcpLateralCacheAttributes().getTcpListenerHost();
pooledExecutor = Executors.newCachedThreadPool(
new DaemonThreadFactory("JCS-LateralTCPListener-"));
terminated = new AtomicBoolean(false);
shutdown = new AtomicBoolean(false);
ServerSocket serverSocket;
if (host != null && host.length() > 0)
{
log.info( "Listening on {0}:{1}", host, port );
// Resolve host name
final InetAddress inetAddress = InetAddress.getByName(host);
//Bind the SocketAddress with inetAddress and port
final SocketAddress endPoint = new InetSocketAddress(inetAddress, port);
serverSocket = new ServerSocket();
serverSocket.bind(endPoint);
}
else
{
log.info( "Listening on port {0}", port );
serverSocket = new ServerSocket( port );
}
serverSocket.setSoTimeout( acceptTimeOut );
receiver = new ListenerThread(serverSocket);
receiver.setDaemon( true );
receiver.start();
}
catch ( final IOException ex )
{
throw new IllegalStateException( ex );
}
}
/**
* Let the lateral cache set a listener_id. Since there is only one listener for all the
* regions and every region gets registered? the id shouldn't be set if it isn't zero. If it is
* we assume that it is a reconnect.
* <p>
* By default, the listener id is the vmid.
* <p>
* The service should set this value. This value will never be changed by a server we connect
* to. It needs to be non static, for unit tests.
* <p>
* The service will use the value it sets in all send requests to the sender.
* <p>
* @param id The new listenerId value
* @throws IOException
*/
@Override
public void setListenerId( final long id )
throws IOException
{
this.listenerId = id;
log.debug( "set listenerId = {0}", id );
}
/**
* Gets the listenerId attribute of the LateralCacheTCPListener object
* <p>
* @return The listenerId value
* @throws IOException
*/
@Override
public long getListenerId()
throws IOException
{
return this.listenerId;
}
/**
* Increments the put count. Gets the cache that was injected by the lateral factory. Calls put
* on the cache.
* <p>
* @see org.apache.commons.jcs3.engine.behavior.ICacheListener#handlePut(org.apache.commons.jcs3.engine.behavior.ICacheElement)
*/
@Override
public void handlePut( final ICacheElement<K, V> element )
throws IOException
{
putCnt++;
if ( log.isInfoEnabled() && getPutCnt() % 100 == 0 )
{
log.info( "Put Count (port {0}) = {1}",
() -> getTcpLateralCacheAttributes().getTcpListenerPort(),
() -> getPutCnt() );
}
log.debug( "handlePut> cacheName={0}, key={1}",
() -> element.getCacheName(), () -> element.getKey() );
getCache( element.getCacheName() ).localUpdate( element );
}
/**
* Increments the remove count. Gets the cache that was injected by the lateral factory. Calls
* remove on the cache.
* <p>
* @see org.apache.commons.jcs3.engine.behavior.ICacheListener#handleRemove(java.lang.String,
* Object)
*/
@Override
public void handleRemove( final String cacheName, final K key )
throws IOException
{
removeCnt++;
if ( log.isInfoEnabled() && getRemoveCnt() % 100 == 0 )
{
log.info( "Remove Count = {0}", () -> getRemoveCnt() );
}
log.debug( "handleRemove> cacheName={0}, key={1}", cacheName, key );
getCache( cacheName ).localRemove( key );
}
/**
* Gets the cache that was injected by the lateral factory. Calls removeAll on the cache.
* <p>
* @see org.apache.commons.jcs3.engine.behavior.ICacheListener#handleRemoveAll(java.lang.String)
*/
@Override
public void handleRemoveAll( final String cacheName )
throws IOException
{
log.debug( "handleRemoveAll> cacheName={0}", cacheName );
getCache( cacheName ).localRemoveAll();
}
/**
* Gets the cache that was injected by the lateral factory. Calls get on the cache.
* <p>
* @param cacheName
* @param key
* @return a ICacheElement
* @throws IOException
*/
public ICacheElement<K, V> handleGet( final String cacheName, final K key )
throws IOException
{
getCnt++;
if ( log.isInfoEnabled() && getGetCnt() % 100 == 0 )
{
log.info( "Get Count (port {0}) = {1}",
() -> getTcpLateralCacheAttributes().getTcpListenerPort(),
() -> getGetCnt() );
}
log.debug( "handleGet> cacheName={0}, key={1}", cacheName, key );
return getCache( cacheName ).localGet( key );
}
/**
* Gets the cache that was injected by the lateral factory. Calls get on the cache.
* <p>
* @param cacheName the name of the cache
* @param pattern the matching pattern
* @return Map
* @throws IOException
*/
public Map<K, ICacheElement<K, V>> handleGetMatching( final String cacheName, final String pattern )
throws IOException
{
getCnt++;
if ( log.isInfoEnabled() && getGetCnt() % 100 == 0 )
{
log.info( "GetMatching Count (port {0}) = {1}",
() -> getTcpLateralCacheAttributes().getTcpListenerPort(),
() -> getGetCnt() );
}
log.debug( "handleGetMatching> cacheName={0}, pattern={1}", cacheName, pattern );
return getCache( cacheName ).localGetMatching( pattern );
}
/**
* Gets the cache that was injected by the lateral factory. Calls getKeySet on the cache.
* <p>
* @param cacheName the name of the cache
* @return a set of keys
* @throws IOException
*/
public Set<K> handleGetKeySet( final String cacheName ) throws IOException
{
return getCache( cacheName ).getKeySet(true);
}
/**
* This marks this instance as terminated.
* <p>
* @see org.apache.commons.jcs3.engine.behavior.ICacheListener#handleDispose(java.lang.String)
*/
@Override
public void handleDispose( final String cacheName )
throws IOException
{
log.info( "handleDispose > cacheName={0} | Ignoring message. "
+ "Do not dispose from remote.", cacheName );
// TODO handle active deregistration, rather than passive detection
terminated.set(true);
}
@Override
public synchronized void dispose()
{
terminated.set(true);
notify();
pooledExecutor.shutdownNow();
}
/**
* Gets the cacheManager attribute of the LateralCacheTCPListener object.
* <p>
* Normally this is set by the factory. If it wasn't set the listener defaults to the expected
* singleton behavior of the cache manager.
* <p>
* @param name
* @return CompositeCache
*/
protected CompositeCache<K, V> getCache( final String name )
{
return getCacheManager().getCache( name );
}
/**
* This is roughly the number of updates the lateral has received.
* <p>
* @return Returns the putCnt.
*/
public int getPutCnt()
{
return putCnt;
}
/**
* @return Returns the getCnt.
*/
public int getGetCnt()
{
return getCnt;
}
/**
* @return Returns the removeCnt.
*/
public int getRemoveCnt()
{
return removeCnt;
}
/**
* @param cacheMgr The cacheMgr to set.
*/
@Override
public void setCacheManager( final ICompositeCacheManager cacheMgr )
{
this.cacheManager = cacheMgr;
}
/**
* @return Returns the cacheMgr.
*/
@Override
public ICompositeCacheManager getCacheManager()
{
return cacheManager;
}
/**
* @param tcpLateralCacheAttributes The tcpLateralCacheAttributes to set.
*/
public void setTcpLateralCacheAttributes( final ITCPLateralCacheAttributes tcpLateralCacheAttributes )
{
this.tcpLateralCacheAttributes = tcpLateralCacheAttributes;
}
/**
* @return Returns the tcpLateralCacheAttributes.
*/
public ITCPLateralCacheAttributes getTcpLateralCacheAttributes()
{
return tcpLateralCacheAttributes;
}
/**
* Processes commands from the server socket. There should be one listener for each configured
* TCP lateral.
*/
public class ListenerThread
extends Thread
{
/** The socket listener */
private final ServerSocket serverSocket;
/**
* Constructor
*
* @param serverSocket
*/
public ListenerThread(final ServerSocket serverSocket)
{
this.serverSocket = serverSocket;
}
/** Main processing method for the ListenerThread object */
@SuppressWarnings("synthetic-access")
@Override
public void run()
{
try (ServerSocket ssck = serverSocket)
{
ConnectionHandler handler;
outer: while ( true )
{
log.debug( "Waiting for clients to connect " );
Socket socket = null;
inner: while (true)
{
// Check to see if we've been asked to exit, and exit
if (terminated.get())
{
log.debug("Thread terminated, exiting gracefully");
break outer;
}
try
{
socket = ssck.accept();
break inner;
}
catch (final SocketTimeoutException e)
{
// No problem! We loop back up!
continue inner;
}
}
if ( socket != null && log.isDebugEnabled() )
{
final InetAddress inetAddress = socket.getInetAddress();
log.debug( "Connected to client at {0}", inetAddress );
}
handler = new ConnectionHandler( socket );
pooledExecutor.execute( handler );
}
}
catch ( final IOException e )
{
log.error( "Exception caught in TCP listener", e );
}
}
}
/**
* A Separate thread that runs when a command comes into the LateralTCPReceiver.
*/
public class ConnectionHandler
implements Runnable
{
/** The socket connection, passed in via constructor */
private final Socket socket;
/**
* Construct for a given socket
* @param socket
*/
public ConnectionHandler( final Socket socket )
{
this.socket = socket;
}
/**
* Main processing method for the LateralTCPReceiverConnection object
*/
@Override
@SuppressWarnings({"unchecked", // Need to cast from Object
"synthetic-access" })
public void run()
{
try (ObjectInputStream ois =
new ObjectInputStreamClassLoaderAware( socket.getInputStream(), null ))
{
while ( true )
{
final LateralElementDescriptor<K, V> led =
(LateralElementDescriptor<K, V>) ois.readObject();
if ( led == null )
{
log.debug( "LateralElementDescriptor is null" );
continue;
}
if ( led.requesterId == getListenerId() )
{
log.debug( "from self" );
}
else
{
log.debug( "receiving LateralElementDescriptor from another led = {0}",
led );
handle( led );
}
}
}
catch ( final EOFException e )
{
log.info( "Caught EOFException, closing connection.", e );
}
catch ( final SocketException e )
{
log.info( "Caught SocketException, closing connection.", e );
}
catch ( final Exception e )
{
log.error( "Unexpected exception.", e );
}
}
/**
* This calls the appropriate method, based on the command sent in the Lateral element
* descriptor.
* <p>
* @param led
* @throws IOException
*/
@SuppressWarnings("synthetic-access")
private void handle( final LateralElementDescriptor<K, V> led )
throws IOException
{
final String cacheName = led.ce.getCacheName();
final K key = led.ce.getKey();
Serializable obj = null;
switch (led.command)
{
case UPDATE:
handlePut( led.ce );
break;
case REMOVE:
// if a hashcode was given and filtering is on
// check to see if they are the same
// if so, then don't remove, otherwise issue a remove
if ( led.valHashCode != -1 )
{
if ( getTcpLateralCacheAttributes().isFilterRemoveByHashCode() )
{
final ICacheElement<K, V> test = getCache( cacheName ).localGet( key );
if ( test != null )
{
if ( test.getVal().hashCode() == led.valHashCode )
{
log.debug( "Filtering detected identical hashCode [{0}], "
+ "not issuing a remove for led {1}",
led.valHashCode, led );
return;
}
else
{
log.debug( "Different hashcodes, in cache [{0}] sent [{1}]",
test.getVal().hashCode(), led.valHashCode );
}
}
}
}
handleRemove( cacheName, key );
break;
case REMOVEALL:
handleRemoveAll( cacheName );
break;
case GET:
obj = handleGet( cacheName, key );
break;
case GET_MATCHING:
obj = (Serializable) handleGetMatching( cacheName, (String) key );
break;
case GET_KEYSET:
obj = (Serializable) handleGetKeySet(cacheName);
break;
default: break;
}
if (obj != null)
{
final ObjectOutputStream oos = new ObjectOutputStream( socket.getOutputStream() );
oos.writeObject( obj );
oos.flush();
}
}
}
/**
* Shuts down the receiver.
*/
@Override
public void shutdown()
{
if ( shutdown.compareAndSet(false, true) )
{
log.info( "Shutting down TCP Lateral receiver." );
receiver.interrupt();
}
else
{
log.debug( "Shutdown already called." );
}
}
}