| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.catalina.tribes.tipis; |
| |
| import java.io.IOException; |
| import java.io.ObjectInput; |
| import java.io.ObjectOutput; |
| import java.io.Serializable; |
| import java.io.UnsupportedEncodingException; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.LinkedHashMap; |
| import java.util.LinkedHashSet; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import org.apache.catalina.tribes.Channel; |
| import org.apache.catalina.tribes.ChannelException; |
| import org.apache.catalina.tribes.ChannelListener; |
| import org.apache.catalina.tribes.Heartbeat; |
| import org.apache.catalina.tribes.Member; |
| import org.apache.catalina.tribes.MembershipListener; |
| import org.apache.catalina.tribes.group.Response; |
| import org.apache.catalina.tribes.group.RpcCallback; |
| import org.apache.catalina.tribes.group.RpcChannel; |
| import org.apache.catalina.tribes.io.XByteBuffer; |
| import org.apache.catalina.tribes.membership.MemberImpl; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.catalina.tribes.util.Arrays; |
| import java.util.ConcurrentModificationException; |
| |
| /** |
| * <p>Title: </p> |
| * |
| * <p>Description: </p> |
| * |
| * <p>Copyright: Copyright (c) 2005</p> |
| * |
| * <p>Company: </p> |
| * |
| * @author not attributable |
| * @version 1.0 |
| */ |
| public abstract class AbstractReplicatedMap extends LinkedHashMap implements RpcCallback, ChannelListener, MembershipListener, Heartbeat { |
| protected static Log log = LogFactory.getLog(AbstractReplicatedMap.class); |
| |
| /** |
| * The default initial capacity - MUST be a power of two. |
| */ |
| public static final int DEFAULT_INITIAL_CAPACITY = 16; |
| |
| /** |
| * The load factor used when none specified in constructor. |
| **/ |
| public static final float DEFAULT_LOAD_FACTOR = 0.75f; |
| |
| /** |
| * Used to identify the map |
| */ |
| final String chset = "ISO-8859-1"; |
| |
| //------------------------------------------------------------------------------ |
| // INSTANCE VARIABLES |
| //------------------------------------------------------------------------------ |
| private transient long rpcTimeout = 5000; |
| private transient Channel channel; |
| private transient RpcChannel rpcChannel; |
| private transient byte[] mapContextName; |
| private transient boolean stateTransferred = false; |
| private transient Object stateMutex = new Object(); |
| private transient HashMap mapMembers = new HashMap(); |
| private transient int channelSendOptions = Channel.SEND_OPTIONS_DEFAULT; |
| private transient Object mapOwner; |
| private transient ClassLoader[] externalLoaders; |
| protected transient int currentNode = 0; |
| private transient long accessTimeout = 5000; |
| private transient String mapname = ""; |
| |
| |
| //------------------------------------------------------------------------------ |
| // CONSTRUCTORS |
| //------------------------------------------------------------------------------ |
| |
| /** |
| * Creates a new map |
| * @param channel The channel to use for communication |
| * @param timeout long - timeout for RPC messags |
| * @param mapContextName String - unique name for this map, to allow multiple maps per channel |
| * @param initialCapacity int - the size of this map, see HashMap |
| * @param loadFactor float - load factor, see HashMap |
| * @param cls - a list of classloaders to be used for deserialization of objects. |
| */ |
| public AbstractReplicatedMap(Object owner, |
| Channel channel, |
| long timeout, |
| String mapContextName, |
| int initialCapacity, |
| float loadFactor, |
| int channelSendOptions, |
| ClassLoader[] cls) { |
| super(initialCapacity, loadFactor); |
| init(owner, channel, mapContextName, timeout, channelSendOptions, cls); |
| |
| } |
| |
| protected Member[] wrap(Member m) { |
| if ( m == null ) return new Member[0]; |
| else return new Member[] {m}; |
| } |
| |
| private void init(Object owner, Channel channel, String mapContextName, long timeout, int channelSendOptions,ClassLoader[] cls) { |
| log.info("Initializing AbstractReplicatedMap with context name:"+mapContextName); |
| this.mapOwner = owner; |
| this.externalLoaders = cls; |
| this.channelSendOptions = channelSendOptions; |
| this.channel = channel; |
| this.rpcTimeout = timeout; |
| |
| try { |
| this.mapname = mapContextName; |
| //unique context is more efficient if it is stored as bytes |
| this.mapContextName = mapContextName.getBytes(chset); |
| } catch (UnsupportedEncodingException x) { |
| log.warn("Unable to encode mapContextName[" + mapContextName + "] using getBytes(" + chset +") using default getBytes()", x); |
| this.mapContextName = mapContextName.getBytes(); |
| } |
| if ( log.isTraceEnabled() ) log.trace("Created Lazy Map with name:"+mapContextName+", bytes:"+Arrays.toString(this.mapContextName)); |
| |
| //create an rpc channel and add the map as a listener |
| this.rpcChannel = new RpcChannel(this.mapContextName, channel, this); |
| this.channel.addChannelListener(this); |
| this.channel.addMembershipListener(this); |
| |
| |
| try { |
| broadcast(MapMessage.MSG_INIT, true); |
| //transfer state from another map |
| transferState(); |
| broadcast(MapMessage.MSG_START, true); |
| } catch (ChannelException x) { |
| log.warn("Unable to send map start message."); |
| throw new RuntimeException("Unable to start replicated map.",x); |
| } |
| |
| } |
| |
| |
| private void ping(long timeout) throws ChannelException { |
| //send out a map membership message, only wait for the first reply |
| MapMessage msg = new MapMessage(this.mapContextName, MapMessage.MSG_INIT, |
| false, null, null, null, wrap(channel.getLocalMember(false))); |
| Response[] resp = rpcChannel.send(channel.getMembers(), msg, rpcChannel.ALL_REPLY, (channelSendOptions), (int)accessTimeout); |
| for (int i = 0; i < resp.length; i++) { |
| memberAlive(resp[i].getSource()); |
| }//for |
| |
| synchronized (mapMembers) { |
| Iterator it = mapMembers.entrySet().iterator(); |
| long now = System.currentTimeMillis(); |
| while ( it.hasNext() ) { |
| Map.Entry entry = (Map.Entry)it.next(); |
| long access = ((Long)entry.getValue()).longValue(); |
| if ( (now - access) > timeout ) memberDisappeared((Member)entry.getKey()); |
| } |
| }//synch |
| } |
| |
| private void memberAlive(Member member) { |
| synchronized (mapMembers) { |
| if (!mapMembers.containsKey(member)) { |
| mapMemberAdded(member); |
| } //end if |
| mapMembers.put(member, new Long(System.currentTimeMillis())); |
| } |
| } |
| |
| private void broadcast(int msgtype, boolean rpc) throws ChannelException { |
| //send out a map membership message, only wait for the first reply |
| MapMessage msg = new MapMessage(this.mapContextName, msgtype, |
| false, null, null, null, wrap(channel.getLocalMember(false))); |
| if ( rpc) { |
| Response[] resp = rpcChannel.send(channel.getMembers(), msg, rpcChannel.FIRST_REPLY, (channelSendOptions),rpcTimeout); |
| for (int i = 0; i < resp.length; i++) { |
| mapMemberAdded(resp[i].getSource()); |
| messageReceived(resp[i].getMessage(), resp[i].getSource()); |
| } |
| } else { |
| channel.send(channel.getMembers(),msg,channelSendOptions); |
| } |
| } |
| |
| public void breakdown() { |
| finalize(); |
| } |
| |
| public void finalize() { |
| try {broadcast(MapMessage.MSG_STOP,false); }catch ( Exception ignore){} |
| //cleanup |
| if (this.rpcChannel != null) { |
| this.rpcChannel.breakdown(); |
| } |
| if (this.channel != null) { |
| this.channel.removeChannelListener(this); |
| this.channel.removeMembershipListener(this); |
| } |
| this.rpcChannel = null; |
| this.channel = null; |
| this.mapMembers.clear(); |
| super.clear(); |
| this.stateTransferred = false; |
| this.externalLoaders = null; |
| } |
| |
| public int hashCode() { |
| return Arrays.hashCode(this.mapContextName); |
| } |
| |
| public boolean equals(Object o) { |
| if ( o == null ) return false; |
| if ( !(o instanceof AbstractReplicatedMap)) return false; |
| if ( !(o.getClass().equals(this.getClass())) ) return false; |
| AbstractReplicatedMap other = (AbstractReplicatedMap)o; |
| return Arrays.equals(mapContextName,other.mapContextName); |
| } |
| |
| //------------------------------------------------------------------------------ |
| // GROUP COM INTERFACES |
| //------------------------------------------------------------------------------ |
| public Member[] getMapMembers(HashMap members) { |
| synchronized (members) { |
| Member[] result = new Member[members.size()]; |
| members.keySet().toArray(result); |
| return result; |
| } |
| } |
| public Member[] getMapMembers() { |
| return getMapMembers(this.mapMembers); |
| } |
| |
| public Member[] getMapMembersExcl(Member[] exclude) { |
| synchronized (mapMembers) { |
| HashMap list = (HashMap)mapMembers.clone(); |
| for (int i=0; i<exclude.length;i++) list.remove(exclude[i]); |
| return getMapMembers(list); |
| } |
| } |
| |
| |
| /** |
| * Replicates any changes to the object since the last time |
| * The object has to be primary, ie, if the object is a proxy or a backup, it will not be replicated<br> |
| * @param complete - if set to true, the object is replicated to its backup |
| * if set to false, only objects that implement ReplicatedMapEntry and the isDirty() returns true will |
| * be replicated |
| */ |
| public void replicate(Object key, boolean complete) { |
| if ( log.isTraceEnabled() ) |
| log.trace("Replicate invoked on key:"+key); |
| MapEntry entry = (MapEntry)super.get(key); |
| if ( !entry.isSerializable() ) return; |
| if (entry != null && entry.isPrimary() && entry.getBackupNodes()!= null && entry.getBackupNodes().length > 0) { |
| Object value = entry.getValue(); |
| //check to see if we need to replicate this object isDirty()||complete |
| boolean repl = complete || ( (value instanceof ReplicatedMapEntry) && ( (ReplicatedMapEntry) value).isDirty()); |
| |
| if (!repl) { |
| if ( log.isTraceEnabled() ) |
| log.trace("Not replicating:"+key+", no change made"); |
| |
| return; |
| } |
| //check to see if the message is diffable |
| boolean diff = ( (value instanceof ReplicatedMapEntry) && ( (ReplicatedMapEntry) value).isDiffable()); |
| MapMessage msg = null; |
| if (diff) { |
| ReplicatedMapEntry rentry = (ReplicatedMapEntry)entry.getValue(); |
| try { |
| rentry.lock(); |
| //construct a diff message |
| msg = new MapMessage(mapContextName, MapMessage.MSG_BACKUP, |
| true, (Serializable) entry.getKey(), null, |
| rentry.getDiff(), |
| entry.getBackupNodes()); |
| } catch (IOException x) { |
| log.error("Unable to diff object. Will replicate the entire object instead.", x); |
| } finally { |
| rentry.unlock(); |
| } |
| |
| } |
| if (msg == null) { |
| //construct a complete |
| msg = new MapMessage(mapContextName, MapMessage.MSG_BACKUP, |
| false, (Serializable) entry.getKey(), |
| (Serializable) entry.getValue(), |
| null, entry.getBackupNodes()); |
| |
| } |
| try { |
| if ( channel!=null && entry.getBackupNodes()!= null && entry.getBackupNodes().length > 0 ) { |
| channel.send(entry.getBackupNodes(), msg, channelSendOptions); |
| } |
| } catch (ChannelException x) { |
| log.error("Unable to replicate data.", x); |
| } |
| } //end if |
| |
| } |
| |
| /** |
| * This can be invoked by a periodic thread to replicate out any changes. |
| * For maps that don't store objects that implement ReplicatedMapEntry, this |
| * method should be used infrequently to avoid large amounts of data transfer |
| * @param complete boolean |
| */ |
| public void replicate(boolean complete) { |
| Iterator i = super.entrySet().iterator(); |
| while (i.hasNext()) { |
| Map.Entry e = (Map.Entry) i.next(); |
| replicate(e.getKey(), complete); |
| } //while |
| |
| } |
| |
| public void transferState() { |
| try { |
| Member[] members = getMapMembers(); |
| Member backup = members.length > 0 ? (Member) members[0] : null; |
| if (backup != null) { |
| MapMessage msg = new MapMessage(mapContextName, MapMessage.MSG_STATE, false, |
| null, null, null, null); |
| Response[] resp = rpcChannel.send(new Member[] {backup}, msg, rpcChannel.FIRST_REPLY, channelSendOptions, rpcTimeout); |
| if (resp.length > 0) { |
| synchronized (stateMutex) { |
| msg = (MapMessage) resp[0].getMessage(); |
| msg.deserialize(getExternalLoaders()); |
| ArrayList list = (ArrayList) msg.getValue(); |
| for (int i = 0; i < list.size(); i++) { |
| messageReceived( (Serializable) list.get(i), resp[0].getSource()); |
| } //for |
| } |
| } else { |
| log.warn("Transfer state, 0 replies, probably a timeout."); |
| } |
| } |
| } catch (ChannelException x) { |
| log.error("Unable to transfer LazyReplicatedMap state.", x); |
| } catch (IOException x) { |
| log.error("Unable to transfer LazyReplicatedMap state.", x); |
| } catch (ClassNotFoundException x) { |
| log.error("Unable to transfer LazyReplicatedMap state.", x); |
| } |
| stateTransferred = true; |
| } |
| |
| /** |
| * @todo implement state transfer |
| * @param msg Serializable |
| * @return Serializable - null if no reply should be sent |
| */ |
| public Serializable replyRequest(Serializable msg, final Member sender) { |
| if (! (msg instanceof MapMessage))return null; |
| MapMessage mapmsg = (MapMessage) msg; |
| |
| //map init request |
| if (mapmsg.getMsgType() == mapmsg.MSG_INIT) { |
| mapmsg.setBackUpNodes(wrap(channel.getLocalMember(false))); |
| return mapmsg; |
| } |
| |
| //map start request |
| if (mapmsg.getMsgType() == mapmsg.MSG_START) { |
| mapmsg.setBackUpNodes(wrap(channel.getLocalMember(false))); |
| mapMemberAdded(sender); |
| return mapmsg; |
| } |
| |
| //backup request |
| if (mapmsg.getMsgType() == mapmsg.MSG_RETRIEVE_BACKUP) { |
| MapEntry entry = (MapEntry)super.get(mapmsg.getKey()); |
| if (entry == null || (!entry.isSerializable()) )return null; |
| mapmsg.setValue( (Serializable) entry.getValue()); |
| return mapmsg; |
| } |
| |
| //state transfer request |
| if (mapmsg.getMsgType() == mapmsg.MSG_STATE) { |
| synchronized (stateMutex) { //make sure we dont do two things at the same time |
| ArrayList list = new ArrayList(); |
| Iterator i = super.entrySet().iterator(); |
| while (i.hasNext()) { |
| Map.Entry e = (Map.Entry) i.next(); |
| MapEntry entry = (MapEntry) e.getValue(); |
| if ( entry.isSerializable() ) { |
| MapMessage me = new MapMessage(mapContextName, MapMessage.MSG_PROXY, |
| false, (Serializable) entry.getKey(), null, null, entry.getBackupNodes()); |
| list.add(me); |
| } |
| } |
| mapmsg.setValue(list); |
| return mapmsg; |
| |
| } //synchronized |
| } |
| |
| return null; |
| |
| } |
| |
| /** |
| * If the reply has already been sent to the requesting thread, |
| * the rpc callback can handle any data that comes in after the fact. |
| * @param msg Serializable |
| * @param sender Member |
| */ |
| public void leftOver(Serializable msg, Member sender) { |
| //left over membership messages |
| if (! (msg instanceof MapMessage))return; |
| |
| MapMessage mapmsg = (MapMessage) msg; |
| try { |
| mapmsg.deserialize(getExternalLoaders()); |
| if (mapmsg.getMsgType() == MapMessage.MSG_START) { |
| mapMemberAdded(mapmsg.getBackupNodes()[0]); |
| } else if (mapmsg.getMsgType() == MapMessage.MSG_INIT) { |
| memberAlive(mapmsg.getBackupNodes()[0]); |
| } |
| } catch (IOException x ) { |
| log.error("Unable to deserialize MapMessage.",x); |
| } catch (ClassNotFoundException x ) { |
| log.error("Unable to deserialize MapMessage.",x); |
| } |
| } |
| |
| public void messageReceived(Serializable msg, Member sender) { |
| if (! (msg instanceof MapMessage)) return; |
| |
| MapMessage mapmsg = (MapMessage) msg; |
| if ( log.isTraceEnabled() ) { |
| log.trace("Map["+mapname+"] received message:"+mapmsg); |
| } |
| |
| try { |
| mapmsg.deserialize(getExternalLoaders()); |
| } catch (IOException x) { |
| log.error("Unable to deserialize MapMessage.", x); |
| return; |
| } catch (ClassNotFoundException x) { |
| log.error("Unable to deserialize MapMessage.", x); |
| return; |
| } |
| if ( log.isTraceEnabled() ) |
| log.trace("Map message received from:"+sender.getName()+" msg:"+mapmsg); |
| if (mapmsg.getMsgType() == MapMessage.MSG_START) { |
| mapMemberAdded(mapmsg.getBackupNodes()[0]); |
| } |
| |
| if (mapmsg.getMsgType() == MapMessage.MSG_STOP) { |
| memberDisappeared(mapmsg.getBackupNodes()[0]); |
| } |
| |
| if (mapmsg.getMsgType() == MapMessage.MSG_PROXY) { |
| MapEntry entry = (MapEntry)super.get(mapmsg.getKey()); |
| if ( entry==null ) { |
| entry = new MapEntry(mapmsg.getKey(), mapmsg.getValue()); |
| entry.setBackup(false); |
| entry.setProxy(true); |
| entry.setBackupNodes(mapmsg.getBackupNodes()); |
| super.put(entry.getKey(), entry); |
| } else { |
| entry.setProxy(true); |
| entry.setBackup(false); |
| entry.setBackupNodes(mapmsg.getBackupNodes()); |
| } |
| } |
| |
| if (mapmsg.getMsgType() == MapMessage.MSG_REMOVE) { |
| super.remove(mapmsg.getKey()); |
| } |
| |
| if (mapmsg.getMsgType() == MapMessage.MSG_BACKUP) { |
| MapEntry entry = (MapEntry)super.get(mapmsg.getKey()); |
| if (entry == null) { |
| entry = new MapEntry(mapmsg.getKey(), mapmsg.getValue()); |
| entry.setBackup(true); |
| entry.setProxy(false); |
| entry.setBackupNodes(mapmsg.getBackupNodes()); |
| if (mapmsg.getValue()!=null && mapmsg.getValue() instanceof ReplicatedMapEntry ) { |
| ((ReplicatedMapEntry)mapmsg.getValue()).setOwner(getMapOwner()); |
| } |
| } else { |
| entry.setBackup(true); |
| entry.setProxy(false); |
| entry.setBackupNodes(mapmsg.getBackupNodes()); |
| if (entry.getValue() instanceof ReplicatedMapEntry) { |
| ReplicatedMapEntry diff = (ReplicatedMapEntry) entry.getValue(); |
| if (mapmsg.isDiff()) { |
| try { |
| diff.lock(); |
| diff.applyDiff(mapmsg.getDiffValue(), 0, mapmsg.getDiffValue().length); |
| } catch (Exception x) { |
| log.error("Unable to apply diff to key:" + entry.getKey(), x); |
| } finally { |
| diff.unlock(); |
| } |
| } else { |
| if ( mapmsg.getValue()!=null ) entry.setValue(mapmsg.getValue()); |
| ((ReplicatedMapEntry)entry.getValue()).setOwner(getMapOwner()); |
| } //end if |
| } else if (mapmsg.getValue() instanceof ReplicatedMapEntry) { |
| ReplicatedMapEntry re = (ReplicatedMapEntry)mapmsg.getValue(); |
| re.setOwner(getMapOwner()); |
| entry.setValue(re); |
| } else { |
| if ( mapmsg.getValue()!=null ) entry.setValue(mapmsg.getValue()); |
| } //end if |
| } //end if |
| super.put(entry.getKey(), entry); |
| } //end if |
| } |
| |
| public boolean accept(Serializable msg, Member sender) { |
| boolean result = false; |
| if (msg instanceof MapMessage) { |
| if ( log.isTraceEnabled() ) log.trace("Map["+mapname+"] accepting...."+msg); |
| result = Arrays.equals(mapContextName, ( (MapMessage) msg).getMapId()); |
| if ( log.isTraceEnabled() ) log.trace("Msg["+mapname+"] accepted["+result+"]...."+msg); |
| } |
| return result; |
| } |
| |
| public void mapMemberAdded(Member member) { |
| if ( member.equals(getChannel().getLocalMember(false)) ) return; |
| boolean memberAdded = false; |
| //select a backup node if we don't have one |
| synchronized (mapMembers) { |
| if (!mapMembers.containsKey(member) ) { |
| mapMembers.put(member, new Long(System.currentTimeMillis())); |
| memberAdded = true; |
| } |
| } |
| if ( memberAdded ) { |
| synchronized (stateMutex) { |
| Iterator i = super.entrySet().iterator(); |
| while (i.hasNext()) { |
| Map.Entry e = (Map.Entry) i.next(); |
| MapEntry entry = (MapEntry) e.getValue(); |
| if ( entry == null ) continue; |
| if (entry.isPrimary() && (entry.getBackupNodes() == null || entry.getBackupNodes().length == 0)) { |
| try { |
| Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue()); |
| entry.setBackupNodes(backup); |
| } catch (ChannelException x) { |
| log.error("Unable to select backup node.", x); |
| } //catch |
| } //end if |
| } //while |
| } //synchronized |
| }//end if |
| } |
| |
| public boolean inSet(Member m, Member[] set) { |
| if ( set == null ) return false; |
| boolean result = false; |
| for (int i=0; i<set.length && (!result); i++ ) |
| if ( m.equals(set[i]) ) result = true; |
| return result; |
| } |
| |
| public Member[] excludeFromSet(Member[] mbrs, Member[] set) { |
| ArrayList result = new ArrayList(); |
| for (int i=0; i<set.length; i++ ) { |
| boolean include = true; |
| for (int j=0; j<mbrs.length; j++ ) |
| if ( mbrs[j].equals(set[i]) ) include = false; |
| if ( include ) result.add(set[i]); |
| } |
| return (Member[])result.toArray(new Member[result.size()]); |
| } |
| |
| public void memberAdded(Member member) { |
| //do nothing |
| } |
| |
| public void memberDisappeared(Member member) { |
| boolean removed = false; |
| synchronized (mapMembers) { |
| removed = (mapMembers.remove(member) != null ); |
| } |
| Iterator i = super.entrySet().iterator(); |
| while (i.hasNext()) { |
| Map.Entry e = (Map.Entry) i.next(); |
| MapEntry entry = (MapEntry) e.getValue(); |
| if (entry.isPrimary() && inSet(member,entry.getBackupNodes())) { |
| try { |
| Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue()); |
| entry.setBackupNodes(backup); |
| } catch (ChannelException x) { |
| log.error("Unable to relocate[" + entry.getKey() + "] to a new backup node", x); |
| } |
| } //end if |
| } //while |
| } |
| |
| public int getNextBackupIndex() { |
| int size = mapMembers.size(); |
| if (mapMembers.size() == 0)return -1; |
| int node = currentNode++; |
| if (node >= size) { |
| node = 0; |
| currentNode = 0; |
| } |
| return node; |
| } |
| public Member getNextBackupNode() { |
| Member[] members = getMapMembers(); |
| int node = getNextBackupIndex(); |
| if ( members.length == 0 || node==-1) return null; |
| if ( node >= members.length ) node = 0; |
| return members[node]; |
| } |
| |
| protected abstract Member[] publishEntryInfo(Object key, Object value) throws ChannelException; |
| |
| public void heartbeat() { |
| try { |
| ping(accessTimeout); |
| }catch ( Exception x ) { |
| log.error("Unable to send AbstractReplicatedMap.ping message",x); |
| } |
| } |
| |
| //------------------------------------------------------------------------------ |
| // METHODS TO OVERRIDE |
| //------------------------------------------------------------------------------ |
| |
| /** |
| * Removes an object from this map, it will also remove it from |
| * |
| * @param key Object |
| * @return Object |
| */ |
| public Object remove(Object key) { |
| MapEntry entry = (MapEntry)super.remove(key); |
| |
| try { |
| MapMessage msg = new MapMessage(getMapContextName(),MapMessage.MSG_REMOVE,false,(Serializable)key,null,null,null); |
| getChannel().send(getMapMembers(), msg,getChannelSendOptions()); |
| } catch ( ChannelException x ) { |
| log.error("Unable to replicate out data for a LazyReplicatedMap.remove operation",x); |
| } |
| return entry!=null?entry.getValue():null; |
| } |
| |
| public Object get(Object key) { |
| MapEntry entry = (MapEntry)super.get(key); |
| if (log.isTraceEnabled()) log.trace("Requesting id:"+key+" entry:"+entry); |
| if ( entry == null ) return null; |
| if ( !entry.isPrimary() ) { |
| //if the message is not primary, we need to retrieve the latest value |
| try { |
| Member[] backup = null; |
| MapMessage msg = null; |
| if ( !entry.isBackup() ) { |
| //make sure we don't retrieve from ourselves |
| msg = new MapMessage(getMapContextName(), MapMessage.MSG_RETRIEVE_BACKUP, false, |
| (Serializable) key, null, null, null); |
| Response[] resp = getRpcChannel().send(entry.getBackupNodes(),msg, this.getRpcChannel().FIRST_REPLY, Channel.SEND_OPTIONS_DEFAULT, getRpcTimeout()); |
| if (resp == null || resp.length == 0) { |
| //no responses |
| log.warn("Unable to retrieve remote object for key:" + key); |
| return null; |
| } |
| msg = (MapMessage) resp[0].getMessage(); |
| msg.deserialize(getExternalLoaders()); |
| backup = entry.getBackupNodes(); |
| if ( entry.getValue() instanceof ReplicatedMapEntry ) { |
| ReplicatedMapEntry val = (ReplicatedMapEntry)entry.getValue(); |
| val.setOwner(getMapOwner()); |
| } |
| if ( msg.getValue()!=null ) entry.setValue(msg.getValue()); |
| } |
| if (entry.isBackup()) { |
| //select a new backup node |
| backup = publishEntryInfo(key, entry.getValue()); |
| } else if ( entry.isProxy() ) { |
| //invalidate the previous primary |
| msg = new MapMessage(getMapContextName(),MapMessage.MSG_PROXY,false,(Serializable)key,null,null,backup); |
| Member[] dest = getMapMembersExcl(backup); |
| if ( dest!=null && dest.length >0) { |
| getChannel().send(dest, msg, getChannelSendOptions()); |
| } |
| } |
| |
| entry.setBackupNodes(backup); |
| entry.setBackup(false); |
| entry.setProxy(false); |
| |
| |
| } catch (Exception x) { |
| log.error("Unable to replicate out data for a LazyReplicatedMap.get operation", x); |
| return null; |
| } |
| } |
| if (log.isTraceEnabled()) log.trace("Requesting id:"+key+" result:"+entry.getValue()); |
| return entry.getValue(); |
| } |
| |
| |
| protected void printMap(String header) { |
| try { |
| System.out.println("\nDEBUG MAP:"+header); |
| System.out.println("Map["+ new String(mapContextName, chset) + ", Map Size:" + super.size()); |
| Member[] mbrs = getMapMembers(); |
| for ( int i=0; i<mbrs.length;i++ ) { |
| System.out.println("Mbr["+(i+1)+"="+mbrs[i].getName()); |
| } |
| Iterator i = super.entrySet().iterator(); |
| int cnt = 0; |
| |
| while (i.hasNext()) { |
| Map.Entry e = (Map.Entry) i.next(); |
| System.out.println( (++cnt) + ". " + e.getValue()); |
| } |
| System.out.println("EndMap]\n\n"); |
| }catch ( Exception ignore) { |
| ignore.printStackTrace(); |
| } |
| } |
| |
| /** |
| * Returns true if the key has an entry in the map. |
| * The entry can be a proxy or a backup entry, invoking <code>get(key)</code> |
| * will make this entry primary for the group |
| * @param key Object |
| * @return boolean |
| */ |
| public boolean containsKey(Object key) { |
| return super.containsKey(key); |
| } |
| |
| |
| public Object put(Object key, Object value) { |
| MapEntry entry = new MapEntry(key,value); |
| entry.setBackup(false); |
| entry.setProxy(false); |
| |
| Object old = null; |
| |
| //make sure that any old values get removed |
| if ( containsKey(key) ) old = remove(key); |
| try { |
| Member[] backup = publishEntryInfo(key, value); |
| entry.setBackupNodes(backup); |
| } catch (ChannelException x) { |
| log.error("Unable to replicate out data for a LazyReplicatedMap.put operation", x); |
| } |
| super.put(key,entry); |
| return old; |
| } |
| |
| |
| /** |
| * Copies all values from one map to this instance |
| * @param m Map |
| */ |
| public void putAll(Map m) { |
| Iterator i = m.entrySet().iterator(); |
| while ( i.hasNext() ) { |
| Map.Entry entry = (Map.Entry)i.next(); |
| put(entry.getKey(),entry.getValue()); |
| } |
| } |
| |
| public void clear() { |
| //only delete active keys |
| Iterator keys = keySet().iterator(); |
| while ( keys.hasNext() ) remove(keys.next()); |
| } |
| |
| public boolean containsValue(Object value) { |
| if ( value == null ) { |
| return super.containsValue(value); |
| } else { |
| Iterator i = super.entrySet().iterator(); |
| while (i.hasNext()) { |
| Map.Entry e = (Map.Entry) i.next(); |
| MapEntry entry = (MapEntry) e.getValue(); |
| if (entry.isPrimary() && value.equals(entry.getValue())) return true; |
| }//while |
| return false; |
| }//end if |
| } |
| |
| public Object clone() { |
| throw new UnsupportedOperationException("This operation is not valid on a replicated map"); |
| } |
| |
| /** |
| * Returns the entire contents of the map |
| * Map.Entry.getValue() will return a LazyReplicatedMap.MapEntry object containing all the information |
| * about the object. |
| * @return Set |
| */ |
| public Set entrySetFull() { |
| return super.entrySet(); |
| } |
| |
| public Set keySetFull() { |
| return super.keySet(); |
| } |
| |
| public int sizeFull() { |
| return super.size(); |
| } |
| |
| public Set entrySet() { |
| LinkedHashSet set = new LinkedHashSet(super.size()); |
| Iterator i = super.entrySet().iterator(); |
| while ( i.hasNext() ) { |
| Map.Entry e = (Map.Entry)i.next(); |
| MapEntry entry = (MapEntry)e.getValue(); |
| if ( entry.isPrimary() ) set.add(entry); |
| } |
| return Collections.unmodifiableSet(set); |
| } |
| |
| public Set keySet() { |
| //todo implement |
| //should only return keys where this is active. |
| LinkedHashSet set = new LinkedHashSet(super.size()); |
| Iterator i = super.entrySet().iterator(); |
| while ( i.hasNext() ) { |
| Map.Entry e = (Map.Entry)i.next(); |
| MapEntry entry = (MapEntry)e.getValue(); |
| if ( entry.isPrimary() ) set.add(entry.getKey()); |
| } |
| return Collections.unmodifiableSet(set); |
| } |
| |
| |
| public int size() { |
| //todo, implement a counter variable instead |
| //only count active members in this node |
| int counter = 0; |
| Object[] items = super.entrySet().toArray(); |
| for (int i=0; i<items.length; i++ ) { |
| Map.Entry e = (Map.Entry) items[i]; |
| if ( e != null ) { |
| MapEntry entry = (MapEntry) e.getValue(); |
| if (entry.isPrimary() && entry.getValue() != null) counter++; |
| } |
| } |
| return counter; |
| } |
| |
| protected boolean removeEldestEntry(Map.Entry eldest) { |
| return false; |
| } |
| |
| public boolean isEmpty() { |
| return size()==0; |
| } |
| |
| public Collection values() { |
| ArrayList values = new ArrayList(super.size()); |
| Iterator i = super.entrySet().iterator(); |
| while ( i.hasNext() ) { |
| Map.Entry e = (Map.Entry)i.next(); |
| MapEntry entry = (MapEntry)e.getValue(); |
| if ( entry.isPrimary() && entry.getValue()!=null) values.add(entry.getValue()); |
| } |
| return Collections.unmodifiableCollection(values); |
| } |
| |
| |
| //------------------------------------------------------------------------------ |
| // Map Entry class |
| //------------------------------------------------------------------------------ |
| public static class MapEntry implements Map.Entry { |
| private boolean backup; |
| private boolean proxy; |
| private Member[] backupNodes; |
| |
| private Object key; |
| private Object value; |
| |
| public MapEntry(Object key, Object value) { |
| setKey(key); |
| setValue(value); |
| |
| } |
| |
| public boolean isKeySerializable() { |
| return (key == null) || (key instanceof Serializable); |
| } |
| |
| public boolean isValueSerializable() { |
| return (value==null) || (value instanceof Serializable); |
| } |
| |
| public boolean isSerializable() { |
| return isKeySerializable() && isValueSerializable(); |
| } |
| |
| public boolean isBackup() { |
| return backup; |
| } |
| |
| public void setBackup(boolean backup) { |
| this.backup = backup; |
| } |
| |
| public boolean isProxy() { |
| return proxy; |
| } |
| |
| public boolean isPrimary() { |
| return ( (!proxy) && (!backup)); |
| } |
| |
| public void setProxy(boolean proxy) { |
| this.proxy = proxy; |
| } |
| |
| public boolean isDiffable() { |
| return (value instanceof ReplicatedMapEntry) && |
| ((ReplicatedMapEntry)value).isDiffable(); |
| } |
| |
| public void setBackupNodes(Member[] nodes) { |
| this.backupNodes = nodes; |
| } |
| |
| public Member[] getBackupNodes() { |
| return backupNodes; |
| } |
| |
| public Object getValue() { |
| return value; |
| } |
| |
| public Object setValue(Object value) { |
| Object old = this.value; |
| this.value = (Serializable) value; |
| return old; |
| } |
| |
| public Object getKey() { |
| return key; |
| } |
| |
| public Object setKey(Object key) { |
| Object old = this.key; |
| this.key = (Serializable)key; |
| return old; |
| } |
| |
| public int hashCode() { |
| return key.hashCode(); |
| } |
| |
| public boolean equals(Object o) { |
| return key.equals(o); |
| } |
| |
| /** |
| * apply a diff, or an entire object |
| * @param data byte[] |
| * @param offset int |
| * @param length int |
| * @param diff boolean |
| * @throws IOException |
| * @throws ClassNotFoundException |
| */ |
| public void apply(byte[] data, int offset, int length, boolean diff) throws IOException, ClassNotFoundException { |
| if (isDiffable() && diff) { |
| ReplicatedMapEntry rentry = (ReplicatedMapEntry) value; |
| try { |
| rentry.lock(); |
| rentry.applyDiff(data, offset, length); |
| } finally { |
| rentry.unlock(); |
| } |
| } else if (length == 0) { |
| value = null; |
| proxy = true; |
| } else { |
| value = XByteBuffer.deserialize(data, offset, length); |
| } |
| } |
| |
| public String toString() { |
| StringBuffer buf = new StringBuffer("MapEntry[key:"); |
| buf.append(getKey()).append("; "); |
| buf.append("value:").append(getValue()).append("; "); |
| buf.append("primary:").append(isPrimary()).append("; "); |
| buf.append("backup:").append(isBackup()).append("; "); |
| buf.append("proxy:").append(isProxy()).append(";]"); |
| return buf.toString(); |
| } |
| |
| } |
| |
| //------------------------------------------------------------------------------ |
| // map message to send to and from other maps |
| //------------------------------------------------------------------------------ |
| |
| public static class MapMessage implements Serializable { |
| public static final int MSG_BACKUP = 1; |
| public static final int MSG_RETRIEVE_BACKUP = 2; |
| public static final int MSG_PROXY = 3; |
| public static final int MSG_REMOVE = 4; |
| public static final int MSG_STATE = 5; |
| public static final int MSG_START = 6; |
| public static final int MSG_STOP = 7; |
| public static final int MSG_INIT = 8; |
| |
| private byte[] mapId; |
| private int msgtype; |
| private boolean diff; |
| private transient Serializable key; |
| private transient Serializable value; |
| private byte[] valuedata; |
| private byte[] keydata; |
| private byte[] diffvalue; |
| private Member[] nodes; |
| |
| public String toString() { |
| StringBuffer buf = new StringBuffer("MapMessage[context="); |
| buf.append(new String(mapId)); |
| buf.append("; type="); |
| buf.append(getTypeDesc()); |
| buf.append("; key="); |
| buf.append(key); |
| buf.append("; value="); |
| buf.append(value); |
| return buf.toString(); |
| } |
| |
| public String getTypeDesc() { |
| switch (msgtype) { |
| case MSG_BACKUP: return "MSG_BACKUP"; |
| case MSG_RETRIEVE_BACKUP: return "MSG_RETRIEVE_BACKUP"; |
| case MSG_PROXY: return "MSG_PROXY"; |
| case MSG_REMOVE: return "MSG_REMOVE"; |
| case MSG_STATE: return "MSG_STATE"; |
| case MSG_START: return "MSG_START"; |
| case MSG_STOP: return "MSG_STOP"; |
| case MSG_INIT: return "MSG_INIT"; |
| default : return "UNKNOWN"; |
| } |
| } |
| |
| public MapMessage() {} |
| |
| public MapMessage(byte[] mapId,int msgtype, boolean diff, |
| Serializable key, Serializable value, |
| byte[] diffvalue, Member[] nodes) { |
| this.mapId = mapId; |
| this.msgtype = msgtype; |
| this.diff = diff; |
| this.key = key; |
| this.value = value; |
| this.diffvalue = diffvalue; |
| this.nodes = nodes; |
| setValue(value); |
| setKey(key); |
| } |
| |
| public void deserialize(ClassLoader[] cls) throws IOException, ClassNotFoundException { |
| key(cls); |
| value(cls); |
| } |
| |
| public int getMsgType() { |
| return msgtype; |
| } |
| |
| public boolean isDiff() { |
| return diff; |
| } |
| |
| public Serializable getKey() { |
| try { |
| return key(null); |
| } catch ( Exception x ) { |
| log.error("Deserialization error of the MapMessage.key",x); |
| return null; |
| } |
| } |
| |
| public Serializable key(ClassLoader[] cls) throws IOException, ClassNotFoundException { |
| if ( key!=null ) return key; |
| if ( keydata == null || keydata.length == 0 ) return null; |
| key = XByteBuffer.deserialize(keydata,0,keydata.length,cls); |
| keydata = null; |
| return key; |
| } |
| |
| public byte[] getKeyData() { |
| return keydata; |
| } |
| |
| public Serializable getValue() { |
| try { |
| return value(null); |
| } catch ( Exception x ) { |
| log.error("Deserialization error of the MapMessage.value",x); |
| return null; |
| } |
| } |
| |
| public Serializable value(ClassLoader[] cls) throws IOException, ClassNotFoundException { |
| if ( value!=null ) return value; |
| if ( valuedata == null || valuedata.length == 0 ) return null; |
| value = XByteBuffer.deserialize(valuedata,0,valuedata.length,cls); |
| valuedata = null;; |
| return value; |
| } |
| |
| public byte[] getValueData() { |
| return valuedata; |
| } |
| |
| public byte[] getDiffValue() { |
| return diffvalue; |
| } |
| |
| public Member[] getBackupNodes() { |
| return nodes; |
| } |
| |
| private void setBackUpNodes(Member[] nodes) { |
| this.nodes = nodes; |
| } |
| |
| public byte[] getMapId() { |
| return mapId; |
| } |
| |
| public void setValue(Serializable value) { |
| try { |
| if ( value != null ) valuedata = XByteBuffer.serialize(value); |
| this.value = value; |
| }catch ( IOException x ) { |
| throw new RuntimeException(x); |
| } |
| } |
| |
| public void setKey(Serializable key) { |
| try { |
| if (key != null) keydata = XByteBuffer.serialize(key); |
| this.key = key; |
| } catch (IOException x) { |
| throw new RuntimeException(x); |
| } |
| } |
| |
| protected Member[] readMembers(ObjectInput in) throws IOException, ClassNotFoundException { |
| int nodecount = in.readInt(); |
| Member[] members = new Member[nodecount]; |
| for ( int i=0; i<members.length; i++ ) { |
| byte[] d = new byte[in.readInt()]; |
| in.read(d); |
| if (d.length > 0) members[i] = MemberImpl.getMember(d); |
| } |
| return members; |
| } |
| |
| protected void writeMembers(ObjectOutput out,Member[] members) throws IOException { |
| if ( members == null ) members = new Member[0]; |
| out.writeInt(members.length); |
| for (int i=0; i<members.length; i++ ) { |
| if ( members[i] != null ) { |
| byte[] d = members[i] != null ? ( (MemberImpl)members[i]).getData(false) : new byte[0]; |
| out.writeInt(d.length); |
| out.write(d); |
| } |
| } |
| } |
| |
| |
| /** |
| * shallow clone |
| * @return Object |
| */ |
| public Object clone() { |
| MapMessage msg = new MapMessage(this.mapId, this.msgtype, this.diff, this.key, this.value, this.diffvalue, this.nodes); |
| msg.keydata = this.keydata; |
| msg.valuedata = this.valuedata; |
| return msg; |
| } |
| } //MapMessage |
| |
| |
| public Channel getChannel() { |
| return channel; |
| } |
| |
| public byte[] getMapContextName() { |
| return mapContextName; |
| } |
| |
| public RpcChannel getRpcChannel() { |
| return rpcChannel; |
| } |
| |
| public long getRpcTimeout() { |
| return rpcTimeout; |
| } |
| |
| public Object getStateMutex() { |
| return stateMutex; |
| } |
| |
| public boolean isStateTransferred() { |
| return stateTransferred; |
| } |
| |
| public Object getMapOwner() { |
| return mapOwner; |
| } |
| |
| public ClassLoader[] getExternalLoaders() { |
| return externalLoaders; |
| } |
| |
| public int getChannelSendOptions() { |
| return channelSendOptions; |
| } |
| |
| public long getAccessTimeout() { |
| return accessTimeout; |
| } |
| |
| public void setMapOwner(Object mapOwner) { |
| this.mapOwner = mapOwner; |
| } |
| |
| public void setExternalLoaders(ClassLoader[] externalLoaders) { |
| this.externalLoaders = externalLoaders; |
| } |
| |
| public void setChannelSendOptions(int channelSendOptions) { |
| this.channelSendOptions = channelSendOptions; |
| } |
| |
| public void setAccessTimeout(long accessTimeout) { |
| this.accessTimeout = accessTimeout; |
| } |
| |
| } |