| package org.apache.jcs.auxiliary.javagroups; |
| |
| |
| /* |
| * Copyright 2001-2004 The Apache Software Foundation. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License") |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.jcs.auxiliary.AuxiliaryCache; |
| import org.apache.jcs.engine.CacheConstants; |
| import org.apache.jcs.engine.CacheElement; |
| import org.apache.jcs.engine.behavior.ICacheElement; |
| import org.apache.jcs.engine.behavior.ICacheType; |
| import org.apache.jcs.engine.control.CompositeCache; |
| import org.apache.jcs.engine.stats.StatElement; |
| import org.apache.jcs.engine.stats.Stats; |
| import org.apache.jcs.engine.stats.behavior.IStatElement; |
| import org.apache.jcs.engine.stats.behavior.IStats; |
| import org.jgroups.Channel; |
| import org.jgroups.Message; |
| import org.jgroups.View; |
| import org.jgroups.Address; |
| import org.jgroups.MembershipListener; |
| import org.jgroups.util.RspList; |
| import org.jgroups.blocks.RequestHandler; |
| import org.jgroups.blocks.GroupRequest; |
| import org.jgroups.blocks.MessageDispatcher; |
| |
| import java.io.IOException; |
| import java.io.Serializable; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.Vector; |
| |
| /** |
| * Auxiliary cache using javagroups. Expects to be created with a Channel, |
| * the {@link JavaGroupsCacheFactory} is responsible for creating that channel. |
| * To do so it uses configuration properties specified by an instance of |
| * {@link JavaGroupsCacheAttributes}. |
| * <p> |
| * At creation time the provided channel is connected to a group having the |
| * same name as the cache / region name this auxiliary is associated with. |
| * update / remove / removeAll operations are broadcast to all members of the |
| * group. A listener thread processes requests from other members of the group, |
| * and dispatches to appropriate methods on the associated CompositeCache. </p> |
| * <p> |
| * Calls to get are currently ignored. |
| * <p> |
| * Messages are sent to peers asynchronously. Synchronous messaging could be |
| * added using MessageDispatcher or RpcDispatcher. Combined with a get |
| * implementation this could provide much higher cache consistency (but with |
| * a substantial speed penalty). |
| * |
| * @version $Id$ |
| */ |
| public class JavaGroupsCache |
| implements AuxiliaryCache, RequestHandler, MembershipListener |
| { |
| private final Log log = LogFactory.getLog( JavaGroupsCache.class ); |
| |
| private String cacheName; |
| private int status; |
| |
| private boolean getFromPeers; |
| |
| private CompositeCache cache; |
| |
| private Channel channel; |
| |
| private MessageDispatcher dispatcher; |
| |
| public JavaGroupsCache( CompositeCache cache, |
| Channel channel, |
| boolean getFromPeers ) |
| throws Exception |
| { |
| this.cache = cache; |
| |
| this.cacheName = cache.getCacheName(); |
| this.channel = channel; |
| |
| this.getFromPeers = getFromPeers; |
| |
| // The adapter listens to the channel and fires MessageListener events |
| // on this object. |
| |
| dispatcher = new MessageDispatcher( channel, null, this, this ); |
| |
| // Connect channel to the 'group' for our region name |
| |
| channel.setOpt( Channel.LOCAL, Boolean.FALSE ); |
| |
| channel.connect( cacheName ); |
| |
| // If all the above succeed, the cache is now alive. |
| |
| this.status = CacheConstants.STATUS_ALIVE; |
| |
| log.info( "Initialized for cache: " + cacheName ); |
| } |
| |
| public void send( ICacheElement element, int command ) |
| { |
| Request request = new Request( element, command ); |
| |
| try |
| { |
| dispatcher.castMessage( null, |
| new Message( null, null, request ), |
| GroupRequest.GET_NONE, |
| 0 ); |
| |
| } |
| catch ( Exception e ) |
| { |
| log.error( "Failed to send JavaGroups message", e ); |
| } |
| } |
| |
| // ----------------------------------------------- interface AuxiliaryCache |
| |
| /** |
| * Sends the provided element to all peers (connected to the same channel |
| * and region name). |
| * |
| * @param ce CacheElement to replicate |
| * @throws IOException Never thrown by this implementation |
| */ |
| public void update( ICacheElement ce ) throws IOException |
| { |
| send( ce, Request.UPDATE ); |
| } |
| |
| /** |
| * If 'getFromPeers' is true, this will attempt to get the requested |
| * element from ant other members of the group. |
| * |
| * @param key |
| * @return |
| * @throws IOException Never thrown by this implementation |
| */ |
| public ICacheElement get( Serializable key ) throws IOException |
| { |
| if ( getFromPeers ) |
| { |
| CacheElement element = new CacheElement( cacheName, key, null ); |
| |
| Request request = new Request( element, Request.GET ); |
| |
| // Cast message and wait for all responses. |
| |
| // FIXME: we can stop waiting after the first not null response, |
| // that is more difficult to implement however. |
| |
| RspList responses = |
| dispatcher.castMessage( null, |
| new Message( null, null, request ), |
| GroupRequest.GET_ALL, |
| 0 ); |
| |
| // Get results only gives the responses which were not null |
| |
| Vector results = responses.getResults(); |
| |
| // If there were any non null results, return the first |
| |
| if ( results.size() > 0 ) |
| { |
| return ( ICacheElement ) results.get( 0 ); |
| } |
| } |
| |
| return null; |
| } |
| |
| /** |
| * Sends a request to all peers to remove the element having the provided |
| * key. |
| * |
| * @param key Key of element to be removed |
| * @throws IOException Never thrown by this implementation |
| */ |
| public boolean remove( Serializable key ) throws IOException |
| { |
| CacheElement ce = new CacheElement( cacheName, key, null ); |
| |
| send( ce, Request.REMOVE ); |
| |
| return false; |
| } |
| |
| /** |
| * Sends a request to remove ALL elements from the peers |
| * |
| * @throws IOException Never thrown by this implementation |
| */ |
| public void removeAll() throws IOException |
| { |
| CacheElement ce = new CacheElement( cacheName, null, null ); |
| |
| send( ce, Request.REMOVE_ALL ); |
| } |
| |
| /** |
| * Dispose this cache, terminates the listener thread and disconnects the |
| * channel from the group. |
| * |
| * @throws IOException |
| */ |
| public void dispose() throws IOException |
| { |
| // This will join the scheduler thread and ensure everything terminates |
| |
| dispatcher.stop(); |
| |
| // Now we can disconnect from the group and close the channel |
| |
| channel.disconnect(); |
| channel.close(); |
| |
| status = CacheConstants.STATUS_DISPOSED; |
| |
| log.info( "Disposed for cache: " + cacheName ); |
| } |
| |
| /** |
| * Since this is a lateral, size is not defined. |
| * |
| * @return Always returns 0 |
| */ |
| public int getSize() |
| { |
| return 0; |
| } |
| |
| /** |
| * Returns the status of this auxiliary. |
| * |
| * @return One of the status constants from {@link CacheConstants} |
| */ |
| public int getStatus() |
| { |
| return status; |
| } |
| |
| /** |
| * Accessor for cacheName property |
| * |
| * @return Name of cache / region this auxiliary is associated with. |
| */ |
| public String getCacheName() |
| { |
| return cacheName; |
| } |
| |
| /** |
| * Not implemented (I believe since get is not supported, this should also |
| * not be). |
| * |
| * @param group Ignored |
| * @return Always reurns null |
| */ |
| public Set getGroupKeys( String group ) |
| { |
| return null; |
| } |
| |
| // --------------------------------------------------- interface ICacheType |
| |
| /** |
| * Get the cache type (always Lateral). |
| * |
| * @return Always returns ICacheType.LATERAL_CACHE |
| */ |
| public int getCacheType() |
| { |
| return ICacheType.LATERAL_CACHE; |
| } |
| |
| // ----------------------------------------------- interface RequestHandler |
| |
| /** |
| * Handles a message from a peer. The message should contain a Request, |
| * and depending on the command this will call localUpdate, localRemove, |
| * or localRemoveAll on the associated CompositeCache. |
| * |
| * @param msg The JavaGroups Message |
| * @return Always returns null |
| */ |
| public Object handle( Message msg ) |
| { |
| try |
| { |
| Request request = ( Request ) msg.getObject(); |
| |
| // Switch based on the command and invoke the |
| // appropriate method on the associate composite cache |
| |
| switch ( request.getCommand() ) |
| { |
| case Request.GET: |
| |
| return cache.localGet( request.getCacheElement().getKey() ); |
| // break; |
| |
| case Request.UPDATE: |
| |
| cache.localUpdate( request.getCacheElement() ); |
| break; |
| |
| case Request.REMOVE: |
| |
| cache.localRemove( request.getCacheElement().getKey() ); |
| break; |
| |
| case Request.REMOVE_ALL: |
| |
| cache.localRemoveAll(); |
| break; |
| |
| default: |
| |
| log.error( "Recieved unknown command" ); |
| } |
| } |
| catch ( Exception e ) |
| { |
| log.error( "Failed to process received JavaGroups message", e ); |
| } |
| |
| return null; |
| } |
| |
| // ------------------------------------------- interface MembershipListener |
| |
| public void viewAccepted( View view ) |
| { |
| log.info( "View Changed: " + String.valueOf( view ) ); |
| } |
| |
| public void suspect( Address suspectedAddress ) { } |
| |
| public void block() { } |
| |
| /** |
| * getStats |
| * |
| * @return String |
| */ |
| public String getStats() |
| { |
| return getStatistics().toString(); |
| } |
| |
| /* |
| * (non-Javadoc) |
| * |
| * @see org.apache.jcs.auxiliary.AuxiliaryCache#getStatistics() |
| */ |
| public IStats getStatistics() |
| { |
| IStats stats = new Stats(); |
| stats.setTypeName( "JavaGroups Cache" ); |
| |
| ArrayList elems = new ArrayList(); |
| |
| IStatElement se = null; |
| |
| // no data gathered here |
| |
| // get an array and put them in the Stats object |
| IStatElement[] ses = (IStatElement[]) elems.toArray( new StatElement[0] ); |
| stats.setStatElements( ses ); |
| |
| return stats; |
| } |
| |
| // ---------------------------------------------------------- inner classes |
| |
| /** |
| * Object for messages, wraps the command type (update, remove, or remove |
| * all) and original cache element to distribute. |
| */ |
| static class Request implements Serializable |
| { |
| public final static int UPDATE = 1; |
| public final static int REMOVE = 2; |
| public final static int REMOVE_ALL = 3; |
| public final static int GET = 5; |
| |
| private ICacheElement cacheElement; |
| private int command; |
| |
| public Request( ICacheElement cacheElement, int command ) |
| { |
| this.cacheElement = cacheElement; |
| this.command = command; |
| } |
| |
| public ICacheElement getCacheElement() |
| { |
| return cacheElement; |
| } |
| |
| public int getCommand() |
| { |
| return command; |
| } |
| } |
| } |