| package org.apache.commons.jcs3.auxiliary.lateral.socket.tcp; |
| |
| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| import java.io.EOFException; |
| import java.io.IOException; |
| import java.io.ObjectInputStream; |
| import java.io.ObjectOutputStream; |
| import java.io.Serializable; |
| import java.net.InetAddress; |
| import java.net.InetSocketAddress; |
| import java.net.ServerSocket; |
| import java.net.Socket; |
| import java.net.SocketAddress; |
| import java.net.SocketException; |
| import java.net.SocketTimeoutException; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import org.apache.commons.jcs3.auxiliary.lateral.LateralElementDescriptor; |
| import org.apache.commons.jcs3.auxiliary.lateral.behavior.ILateralCacheListener; |
| import org.apache.commons.jcs3.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes; |
| import org.apache.commons.jcs3.engine.CacheInfo; |
| import org.apache.commons.jcs3.engine.behavior.ICacheElement; |
| import org.apache.commons.jcs3.engine.behavior.ICompositeCacheManager; |
| import org.apache.commons.jcs3.engine.behavior.IShutdownObserver; |
| import org.apache.commons.jcs3.engine.control.CompositeCache; |
| import org.apache.commons.jcs3.io.ObjectInputStreamClassLoaderAware; |
| import org.apache.commons.jcs3.log.Log; |
| import org.apache.commons.jcs3.log.LogManager; |
| import org.apache.commons.jcs3.utils.threadpool.DaemonThreadFactory; |
| |
| /** |
| * Listens for connections from other TCP lateral caches and handles them. The initialization method |
| * starts a listening thread, which creates a socket server. When messages are received they are |
| * passed to a pooled executor which then calls the appropriate handle method. |
| */ |
| public class LateralTCPListener<K, V> |
| implements ILateralCacheListener<K, V>, IShutdownObserver |
| { |
| /** The logger */ |
| private static final Log log = LogManager.getLog( LateralTCPListener.class ); |
| |
| /** How long the server will block on an accept(). 0 is infinite. */ |
| private static final int acceptTimeOut = 1000; |
| |
| /** The CacheHub this listener is associated with */ |
| private transient ICompositeCacheManager cacheManager; |
| |
| /** Map of available instances, keyed by port */ |
| private static final ConcurrentHashMap<String, ILateralCacheListener<?, ?>> instances = |
| new ConcurrentHashMap<>(); |
| |
| /** The socket listener */ |
| private ListenerThread receiver; |
| |
| /** Configuration attributes */ |
| private ITCPLateralCacheAttributes tcpLateralCacheAttributes; |
| |
| /** The processor. We should probably use an event queue here. */ |
| private ExecutorService pooledExecutor; |
| |
| /** put count */ |
| private int putCnt = 0; |
| |
| /** remove count */ |
| private int removeCnt = 0; |
| |
| /** get count */ |
| private int getCnt = 0; |
| |
| /** |
| * Use the vmid by default. This can be set for testing. If we ever need to run more than one |
| * per vm, then we need a new technique. |
| */ |
| private long listenerId = CacheInfo.listenerId; |
| |
| /** is this shut down? */ |
| private AtomicBoolean shutdown; |
| |
| /** is this terminated? */ |
| private AtomicBoolean terminated; |
| |
| /** |
| * Gets the instance attribute of the LateralCacheTCPListener class. |
| * <p> |
| * @param ilca ITCPLateralCacheAttributes |
| * @param cacheMgr |
| * @return The instance value |
| */ |
| public static <K, V> LateralTCPListener<K, V> |
| getInstance( final ITCPLateralCacheAttributes ilca, final ICompositeCacheManager cacheMgr ) |
| { |
| @SuppressWarnings("unchecked") // Need to cast because of common map for all instances |
| final |
| LateralTCPListener<K, V> ins = (LateralTCPListener<K, V>) instances.computeIfAbsent( |
| String.valueOf( ilca.getTcpListenerPort() ), |
| k -> { |
| final LateralTCPListener<K, V> newIns = new LateralTCPListener<>( ilca ); |
| |
| newIns.init(); |
| newIns.setCacheManager( cacheMgr ); |
| |
| log.info( "Created new listener {0}", |
| () -> ilca.getTcpListenerPort() ); |
| |
| return newIns; |
| }); |
| |
| return ins; |
| } |
| |
| /** |
| * Only need one since it does work for all regions, just reference by multiple region names. |
| * <p> |
| * @param ilca |
| */ |
| protected LateralTCPListener( final ITCPLateralCacheAttributes ilca ) |
| { |
| this.setTcpLateralCacheAttributes( ilca ); |
| } |
| |
| /** |
| * This starts the ListenerThread on the specified port. |
| */ |
| @Override |
| public synchronized void init() |
| { |
| try |
| { |
| final int port = getTcpLateralCacheAttributes().getTcpListenerPort(); |
| final String host = getTcpLateralCacheAttributes().getTcpListenerHost(); |
| |
| pooledExecutor = Executors.newCachedThreadPool( |
| new DaemonThreadFactory("JCS-LateralTCPListener-")); |
| terminated = new AtomicBoolean(false); |
| shutdown = new AtomicBoolean(false); |
| |
| final ServerSocket serverSocket; |
| if (host != null && host.length() > 0) |
| { |
| log.info( "Listening on {0}:{1}", host, port ); |
| // Resolve host name |
| final InetAddress inetAddress = InetAddress.getByName(host); |
| //Bind the SocketAddress with inetAddress and port |
| final SocketAddress endPoint = new InetSocketAddress(inetAddress, port); |
| |
| serverSocket = new ServerSocket(); |
| serverSocket.bind(endPoint); |
| } |
| else |
| { |
| log.info( "Listening on port {0}", port ); |
| serverSocket = new ServerSocket( port ); |
| } |
| serverSocket.setSoTimeout( acceptTimeOut ); |
| |
| receiver = new ListenerThread(serverSocket); |
| receiver.setDaemon( true ); |
| receiver.start(); |
| } |
| catch ( final IOException ex ) |
| { |
| throw new IllegalStateException( ex ); |
| } |
| } |
| |
| /** |
| * Let the lateral cache set a listener_id. Since there is only one listener for all the |
| * regions and every region gets registered? the id shouldn't be set if it isn't zero. If it is |
| * we assume that it is a reconnect. |
| * <p> |
| * By default, the listener id is the vmid. |
| * <p> |
| * The service should set this value. This value will never be changed by a server we connect |
| * to. It needs to be non static, for unit tests. |
| * <p> |
| * The service will use the value it sets in all send requests to the sender. |
| * <p> |
| * @param id The new listenerId value |
| * @throws IOException |
| */ |
| @Override |
| public void setListenerId( final long id ) |
| throws IOException |
| { |
| this.listenerId = id; |
| log.debug( "set listenerId = {0}", id ); |
| } |
| |
| /** |
| * Gets the listenerId attribute of the LateralCacheTCPListener object |
| * <p> |
| * @return The listenerId value |
| * @throws IOException |
| */ |
| @Override |
| public long getListenerId() |
| throws IOException |
| { |
| return this.listenerId; |
| } |
| |
| /** |
| * Increments the put count. Gets the cache that was injected by the lateral factory. Calls put |
| * on the cache. |
| * <p> |
| * @see org.apache.commons.jcs3.engine.behavior.ICacheListener#handlePut(org.apache.commons.jcs3.engine.behavior.ICacheElement) |
| */ |
| @Override |
| public void handlePut( final ICacheElement<K, V> element ) |
| throws IOException |
| { |
| putCnt++; |
| if ( log.isInfoEnabled() && getPutCnt() % 100 == 0 ) |
| { |
| log.info( "Put Count (port {0}) = {1}", |
| () -> getTcpLateralCacheAttributes().getTcpListenerPort(), |
| () -> getPutCnt() ); |
| } |
| |
| log.debug( "handlePut> cacheName={0}, key={1}", |
| () -> element.getCacheName(), () -> element.getKey() ); |
| |
| getCache( element.getCacheName() ).localUpdate( element ); |
| } |
| |
| /** |
| * Increments the remove count. Gets the cache that was injected by the lateral factory. Calls |
| * remove on the cache. |
| * <p> |
| * @see org.apache.commons.jcs3.engine.behavior.ICacheListener#handleRemove(java.lang.String, |
| * Object) |
| */ |
| @Override |
| public void handleRemove( final String cacheName, final K key ) |
| throws IOException |
| { |
| removeCnt++; |
| if ( log.isInfoEnabled() && getRemoveCnt() % 100 == 0 ) |
| { |
| log.info( "Remove Count = {0}", () -> getRemoveCnt() ); |
| } |
| |
| log.debug( "handleRemove> cacheName={0}, key={1}", cacheName, key ); |
| |
| getCache( cacheName ).localRemove( key ); |
| } |
| |
| /** |
| * Gets the cache that was injected by the lateral factory. Calls removeAll on the cache. |
| * <p> |
| * @see org.apache.commons.jcs3.engine.behavior.ICacheListener#handleRemoveAll(java.lang.String) |
| */ |
| @Override |
| public void handleRemoveAll( final String cacheName ) |
| throws IOException |
| { |
| log.debug( "handleRemoveAll> cacheName={0}", cacheName ); |
| |
| getCache( cacheName ).localRemoveAll(); |
| } |
| |
| /** |
| * Gets the cache that was injected by the lateral factory. Calls get on the cache. |
| * <p> |
| * @param cacheName |
| * @param key |
| * @return a ICacheElement |
| * @throws IOException |
| */ |
| public ICacheElement<K, V> handleGet( final String cacheName, final K key ) |
| throws IOException |
| { |
| getCnt++; |
| if ( log.isInfoEnabled() && getGetCnt() % 100 == 0 ) |
| { |
| log.info( "Get Count (port {0}) = {1}", |
| () -> getTcpLateralCacheAttributes().getTcpListenerPort(), |
| () -> getGetCnt() ); |
| } |
| |
| log.debug( "handleGet> cacheName={0}, key={1}", cacheName, key ); |
| |
| return getCache( cacheName ).localGet( key ); |
| } |
| |
| /** |
| * Gets the cache that was injected by the lateral factory. Calls get on the cache. |
| * <p> |
| * @param cacheName the name of the cache |
| * @param pattern the matching pattern |
| * @return Map |
| * @throws IOException |
| */ |
| public Map<K, ICacheElement<K, V>> handleGetMatching( final String cacheName, final String pattern ) |
| throws IOException |
| { |
| getCnt++; |
| if ( log.isInfoEnabled() && getGetCnt() % 100 == 0 ) |
| { |
| log.info( "GetMatching Count (port {0}) = {1}", |
| () -> getTcpLateralCacheAttributes().getTcpListenerPort(), |
| () -> getGetCnt() ); |
| } |
| |
| log.debug( "handleGetMatching> cacheName={0}, pattern={1}", cacheName, pattern ); |
| |
| return getCache( cacheName ).localGetMatching( pattern ); |
| } |
| |
| /** |
| * Gets the cache that was injected by the lateral factory. Calls getKeySet on the cache. |
| * <p> |
| * @param cacheName the name of the cache |
| * @return a set of keys |
| * @throws IOException |
| */ |
| public Set<K> handleGetKeySet( final String cacheName ) throws IOException |
| { |
| return getCache( cacheName ).getKeySet(true); |
| } |
| |
| /** |
| * This marks this instance as terminated. |
| * <p> |
| * @see org.apache.commons.jcs3.engine.behavior.ICacheListener#handleDispose(java.lang.String) |
| */ |
| @Override |
| public void handleDispose( final String cacheName ) |
| throws IOException |
| { |
| log.info( "handleDispose > cacheName={0} | Ignoring message. " |
| + "Do not dispose from remote.", cacheName ); |
| |
| // TODO handle active deregistration, rather than passive detection |
| terminated.set(true); |
| } |
| |
| @Override |
| public synchronized void dispose() |
| { |
| terminated.set(true); |
| notify(); |
| |
| pooledExecutor.shutdownNow(); |
| } |
| |
| /** |
| * Gets the cacheManager attribute of the LateralCacheTCPListener object. |
| * <p> |
| * Normally this is set by the factory. If it wasn't set the listener defaults to the expected |
| * singleton behavior of the cache manager. |
| * <p> |
| * @param name |
| * @return CompositeCache |
| */ |
| protected CompositeCache<K, V> getCache( final String name ) |
| { |
| return getCacheManager().getCache( name ); |
| } |
| |
| /** |
| * This is roughly the number of updates the lateral has received. |
| * <p> |
| * @return Returns the putCnt. |
| */ |
| public int getPutCnt() |
| { |
| return putCnt; |
| } |
| |
| /** |
| * @return Returns the getCnt. |
| */ |
| public int getGetCnt() |
| { |
| return getCnt; |
| } |
| |
| /** |
| * @return Returns the removeCnt. |
| */ |
| public int getRemoveCnt() |
| { |
| return removeCnt; |
| } |
| |
| /** |
| * @param cacheMgr The cacheMgr to set. |
| */ |
| @Override |
| public void setCacheManager( final ICompositeCacheManager cacheMgr ) |
| { |
| this.cacheManager = cacheMgr; |
| } |
| |
| /** |
| * @return Returns the cacheMgr. |
| */ |
| @Override |
| public ICompositeCacheManager getCacheManager() |
| { |
| return cacheManager; |
| } |
| |
| /** |
| * @param tcpLateralCacheAttributes The tcpLateralCacheAttributes to set. |
| */ |
| public void setTcpLateralCacheAttributes( final ITCPLateralCacheAttributes tcpLateralCacheAttributes ) |
| { |
| this.tcpLateralCacheAttributes = tcpLateralCacheAttributes; |
| } |
| |
| /** |
| * @return Returns the tcpLateralCacheAttributes. |
| */ |
| public ITCPLateralCacheAttributes getTcpLateralCacheAttributes() |
| { |
| return tcpLateralCacheAttributes; |
| } |
| |
| /** |
| * Processes commands from the server socket. There should be one listener for each configured |
| * TCP lateral. |
| */ |
| public class ListenerThread |
| extends Thread |
| { |
| /** The socket listener */ |
| private final ServerSocket serverSocket; |
| |
| /** |
| * Constructor |
| * |
| * @param serverSocket |
| */ |
| public ListenerThread(final ServerSocket serverSocket) |
| { |
| this.serverSocket = serverSocket; |
| } |
| |
| /** Main processing method for the ListenerThread object */ |
| @SuppressWarnings("synthetic-access") |
| @Override |
| public void run() |
| { |
| try (ServerSocket ssck = serverSocket) |
| { |
| ConnectionHandler handler; |
| |
| outer: while ( true ) |
| { |
| log.debug( "Waiting for clients to connect " ); |
| |
| Socket socket = null; |
| inner: while (true) |
| { |
| // Check to see if we've been asked to exit, and exit |
| if (terminated.get()) |
| { |
| log.debug("Thread terminated, exiting gracefully"); |
| break outer; |
| } |
| |
| try |
| { |
| socket = ssck.accept(); |
| break inner; |
| } |
| catch (final SocketTimeoutException e) |
| { |
| // No problem! We loop back up! |
| continue inner; |
| } |
| } |
| |
| if ( socket != null && log.isDebugEnabled() ) |
| { |
| final InetAddress inetAddress = socket.getInetAddress(); |
| log.debug( "Connected to client at {0}", inetAddress ); |
| } |
| |
| handler = new ConnectionHandler( socket ); |
| pooledExecutor.execute( handler ); |
| } |
| } |
| catch ( final IOException e ) |
| { |
| log.error( "Exception caught in TCP listener", e ); |
| } |
| } |
| } |
| |
| /** |
| * A Separate thread that runs when a command comes into the LateralTCPReceiver. |
| */ |
| public class ConnectionHandler |
| implements Runnable |
| { |
| /** The socket connection, passed in via constructor */ |
| private final Socket socket; |
| |
| /** |
| * Construct for a given socket |
| * @param socket |
| */ |
| public ConnectionHandler( final Socket socket ) |
| { |
| this.socket = socket; |
| } |
| |
| /** |
| * Main processing method for the LateralTCPReceiverConnection object |
| */ |
| @Override |
| @SuppressWarnings({"unchecked", // Need to cast from Object |
| "synthetic-access" }) |
| public void run() |
| { |
| try (ObjectInputStream ois = |
| new ObjectInputStreamClassLoaderAware( socket.getInputStream(), null )) |
| { |
| while ( true ) |
| { |
| final LateralElementDescriptor<K, V> led = |
| (LateralElementDescriptor<K, V>) ois.readObject(); |
| |
| if ( led == null ) |
| { |
| log.debug( "LateralElementDescriptor is null" ); |
| continue; |
| } |
| if ( led.requesterId == getListenerId() ) |
| { |
| log.debug( "from self" ); |
| } |
| else |
| { |
| log.debug( "receiving LateralElementDescriptor from another led = {0}", |
| led ); |
| |
| handle( led ); |
| } |
| } |
| } |
| catch ( final EOFException e ) |
| { |
| log.info( "Caught EOFException, closing connection.", e ); |
| } |
| catch ( final SocketException e ) |
| { |
| log.info( "Caught SocketException, closing connection.", e ); |
| } |
| catch ( final Exception e ) |
| { |
| log.error( "Unexpected exception.", e ); |
| } |
| } |
| |
| /** |
| * This calls the appropriate method, based on the command sent in the Lateral element |
| * descriptor. |
| * <p> |
| * @param led |
| * @throws IOException |
| */ |
| @SuppressWarnings("synthetic-access") |
| private void handle( final LateralElementDescriptor<K, V> led ) |
| throws IOException |
| { |
| final String cacheName = led.ce.getCacheName(); |
| final K key = led.ce.getKey(); |
| Serializable obj = null; |
| |
| switch (led.command) |
| { |
| case UPDATE: |
| handlePut( led.ce ); |
| break; |
| |
| case REMOVE: |
| // if a hashcode was given and filtering is on |
| // check to see if they are the same |
| // if so, then don't remove, otherwise issue a remove |
| if ( led.valHashCode != -1 ) |
| { |
| if ( getTcpLateralCacheAttributes().isFilterRemoveByHashCode() ) |
| { |
| final ICacheElement<K, V> test = getCache( cacheName ).localGet( key ); |
| if ( test != null ) |
| { |
| if ( test.getVal().hashCode() == led.valHashCode ) |
| { |
| log.debug( "Filtering detected identical hashCode [{0}], " |
| + "not issuing a remove for led {1}", |
| led.valHashCode, led ); |
| return; |
| } |
| else |
| { |
| log.debug( "Different hashcodes, in cache [{0}] sent [{1}]", |
| test.getVal().hashCode(), led.valHashCode ); |
| } |
| } |
| } |
| } |
| handleRemove( cacheName, key ); |
| break; |
| |
| case REMOVEALL: |
| handleRemoveAll( cacheName ); |
| break; |
| |
| case GET: |
| obj = handleGet( cacheName, key ); |
| break; |
| |
| case GET_MATCHING: |
| obj = (Serializable) handleGetMatching( cacheName, (String) key ); |
| break; |
| |
| case GET_KEYSET: |
| obj = (Serializable) handleGetKeySet(cacheName); |
| break; |
| |
| default: break; |
| } |
| |
| if (obj != null) |
| { |
| final ObjectOutputStream oos = new ObjectOutputStream( socket.getOutputStream() ); |
| oos.writeObject( obj ); |
| oos.flush(); |
| } |
| } |
| } |
| |
| /** |
| * Shuts down the receiver. |
| */ |
| @Override |
| public void shutdown() |
| { |
| if ( shutdown.compareAndSet(false, true) ) |
| { |
| log.info( "Shutting down TCP Lateral receiver." ); |
| |
| receiver.interrupt(); |
| } |
| else |
| { |
| log.debug( "Shutdown already called." ); |
| } |
| } |
| } |