blob: 1d3561f567d191b78e554f6256863079b51f2c5b [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.catalina.tribes.tipis;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.ChannelException;
import org.apache.catalina.tribes.ChannelListener;
import org.apache.catalina.tribes.Heartbeat;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.MembershipListener;
import org.apache.catalina.tribes.membership.MemberImpl;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.catalina.tribes.util.Arrays;
import java.util.ConcurrentModificationException;
* <p>Title: </p>
* <p>Description: </p>
* <p>Copyright: Copyright (c) 2005</p>
* <p>Company: </p>
* @author not attributable
* @version 1.0
public abstract class AbstractReplicatedMap extends LinkedHashMap implements RpcCallback, ChannelListener, MembershipListener, Heartbeat {
protected static Log log = LogFactory.getLog(AbstractReplicatedMap.class);
* The default initial capacity - MUST be a power of two.
public static final int DEFAULT_INITIAL_CAPACITY = 16;
* The load factor used when none specified in constructor.
public static final float DEFAULT_LOAD_FACTOR = 0.75f;
* Used to identify the map
final String chset = "ISO-8859-1";
private transient long rpcTimeout = 5000;
private transient Channel channel;
private transient RpcChannel rpcChannel;
private transient byte[] mapContextName;
private transient boolean stateTransferred = false;
private transient Object stateMutex = new Object();
private transient HashMap mapMembers = new HashMap();
private transient int channelSendOptions = Channel.SEND_OPTIONS_DEFAULT;
private transient Object mapOwner;
private transient ClassLoader[] externalLoaders;
protected transient int currentNode = 0;
private transient long accessTimeout = 5000;
private transient String mapname = "";
* Creates a new map
* @param channel The channel to use for communication
* @param timeout long - timeout for RPC messags
* @param mapContextName String - unique name for this map, to allow multiple maps per channel
* @param initialCapacity int - the size of this map, see HashMap
* @param loadFactor float - load factor, see HashMap
* @param cls - a list of classloaders to be used for deserialization of objects.
public AbstractReplicatedMap(Object owner,
Channel channel,
long timeout,
String mapContextName,
int initialCapacity,
float loadFactor,
int channelSendOptions,
ClassLoader[] cls) {
super(initialCapacity, loadFactor);
init(owner, channel, mapContextName, timeout, channelSendOptions, cls);
protected Member[] wrap(Member m) {
if ( m == null ) return new Member[0];
else return new Member[] {m};
private void init(Object owner, Channel channel, String mapContextName, long timeout, int channelSendOptions,ClassLoader[] cls) {"Initializing AbstractReplicatedMap with context name:"+mapContextName);
this.mapOwner = owner;
this.externalLoaders = cls;
this.channelSendOptions = channelSendOptions; = channel;
this.rpcTimeout = timeout;
try {
this.mapname = mapContextName;
//unique context is more efficient if it is stored as bytes
this.mapContextName = mapContextName.getBytes(chset);
} catch (UnsupportedEncodingException x) {
log.warn("Unable to encode mapContextName[" + mapContextName + "] using getBytes(" + chset +") using default getBytes()", x);
this.mapContextName = mapContextName.getBytes();
if ( log.isTraceEnabled() ) log.trace("Created Lazy Map with name:"+mapContextName+", bytes:"+Arrays.toString(this.mapContextName));
//create an rpc channel and add the map as a listener
this.rpcChannel = new RpcChannel(this.mapContextName, channel, this);;;
try {
broadcast(MapMessage.MSG_INIT, true);
//transfer state from another map
broadcast(MapMessage.MSG_START, true);
} catch (ChannelException x) {
log.warn("Unable to send map start message.");
throw new RuntimeException("Unable to start replicated map.",x);
private void ping(long timeout) throws ChannelException {
//send out a map membership message, only wait for the first reply
MapMessage msg = new MapMessage(this.mapContextName, MapMessage.MSG_INIT,
false, null, null, null, wrap(channel.getLocalMember(false)));
Response[] resp = rpcChannel.send(channel.getMembers(), msg, rpcChannel.ALL_REPLY, (channelSendOptions), (int)accessTimeout);
for (int i = 0; i < resp.length; i++) {
synchronized (mapMembers) {
Iterator it = mapMembers.entrySet().iterator();
long now = System.currentTimeMillis();
while ( it.hasNext() ) {
Map.Entry entry = (Map.Entry);
long access = ((Long)entry.getValue()).longValue();
if ( (now - access) > timeout ) memberDisappeared((Member)entry.getKey());
private void memberAlive(Member member) {
synchronized (mapMembers) {
if (!mapMembers.containsKey(member)) {
} //end if
mapMembers.put(member, new Long(System.currentTimeMillis()));
private void broadcast(int msgtype, boolean rpc) throws ChannelException {
//send out a map membership message, only wait for the first reply
MapMessage msg = new MapMessage(this.mapContextName, msgtype,
false, null, null, null, wrap(channel.getLocalMember(false)));
if ( rpc) {
Response[] resp = rpcChannel.send(channel.getMembers(), msg, rpcChannel.FIRST_REPLY, (channelSendOptions),rpcTimeout);
for (int i = 0; i < resp.length; i++) {
messageReceived(resp[i].getMessage(), resp[i].getSource());
} else {
public void breakdown() {
public void finalize() {
try {broadcast(MapMessage.MSG_STOP,false); }catch ( Exception ignore){}
if (this.rpcChannel != null) {
if ( != null) {;;
this.rpcChannel = null; = null;
this.stateTransferred = false;
this.externalLoaders = null;
public int hashCode() {
return Arrays.hashCode(this.mapContextName);
public boolean equals(Object o) {
if ( o == null ) return false;
if ( !(o instanceof AbstractReplicatedMap)) return false;
if ( !(o.getClass().equals(this.getClass())) ) return false;
AbstractReplicatedMap other = (AbstractReplicatedMap)o;
return Arrays.equals(mapContextName,other.mapContextName);
public Member[] getMapMembers(HashMap members) {
synchronized (members) {
Member[] result = new Member[members.size()];
return result;
public Member[] getMapMembers() {
return getMapMembers(this.mapMembers);
public Member[] getMapMembersExcl(Member[] exclude) {
synchronized (mapMembers) {
HashMap list = (HashMap)mapMembers.clone();
for (int i=0; i<exclude.length;i++) list.remove(exclude[i]);
return getMapMembers(list);
* Replicates any changes to the object since the last time
* The object has to be primary, ie, if the object is a proxy or a backup, it will not be replicated<br>
* @param complete - if set to true, the object is replicated to its backup
* if set to false, only objects that implement ReplicatedMapEntry and the isDirty() returns true will
* be replicated
public void replicate(Object key, boolean complete) {
if ( log.isTraceEnabled() )
log.trace("Replicate invoked on key:"+key);
MapEntry entry = (MapEntry)super.get(key);
if ( !entry.isSerializable() ) return;
if (entry != null && entry.isPrimary() && entry.getBackupNodes()!= null && entry.getBackupNodes().length > 0) {
Object value = entry.getValue();
//check to see if we need to replicate this object isDirty()||complete
boolean repl = complete || ( (value instanceof ReplicatedMapEntry) && ( (ReplicatedMapEntry) value).isDirty());
if (!repl) {
if ( log.isTraceEnabled() )
log.trace("Not replicating:"+key+", no change made");
//check to see if the message is diffable
boolean diff = ( (value instanceof ReplicatedMapEntry) && ( (ReplicatedMapEntry) value).isDiffable());
MapMessage msg = null;
if (diff) {
ReplicatedMapEntry rentry = (ReplicatedMapEntry)entry.getValue();
try {
//construct a diff message
msg = new MapMessage(mapContextName, MapMessage.MSG_BACKUP,
true, (Serializable) entry.getKey(), null,
} catch (IOException x) {
log.error("Unable to diff object. Will replicate the entire object instead.", x);
} finally {
if (msg == null) {
//construct a complete
msg = new MapMessage(mapContextName, MapMessage.MSG_BACKUP,
false, (Serializable) entry.getKey(),
(Serializable) entry.getValue(),
null, entry.getBackupNodes());
try {
if ( channel!=null && entry.getBackupNodes()!= null && entry.getBackupNodes().length > 0 ) {
channel.send(entry.getBackupNodes(), msg, channelSendOptions);
} catch (ChannelException x) {
log.error("Unable to replicate data.", x);
} //end if
* This can be invoked by a periodic thread to replicate out any changes.
* For maps that don't store objects that implement ReplicatedMapEntry, this
* method should be used infrequently to avoid large amounts of data transfer
* @param complete boolean
public void replicate(boolean complete) {
Iterator i = super.entrySet().iterator();
while (i.hasNext()) {
Map.Entry e = (Map.Entry);
replicate(e.getKey(), complete);
} //while
public void transferState() {
try {
Member[] members = getMapMembers();
Member backup = members.length > 0 ? (Member) members[0] : null;
if (backup != null) {
MapMessage msg = new MapMessage(mapContextName, MapMessage.MSG_STATE, false,
null, null, null, null);
Response[] resp = rpcChannel.send(new Member[] {backup}, msg, rpcChannel.FIRST_REPLY, channelSendOptions, rpcTimeout);
if (resp.length > 0) {
synchronized (stateMutex) {
msg = (MapMessage) resp[0].getMessage();
ArrayList list = (ArrayList) msg.getValue();
for (int i = 0; i < list.size(); i++) {
messageReceived( (Serializable) list.get(i), resp[0].getSource());
} //for
} else {
log.warn("Transfer state, 0 replies, probably a timeout.");
} catch (ChannelException x) {
log.error("Unable to transfer LazyReplicatedMap state.", x);
} catch (IOException x) {
log.error("Unable to transfer LazyReplicatedMap state.", x);
} catch (ClassNotFoundException x) {
log.error("Unable to transfer LazyReplicatedMap state.", x);
stateTransferred = true;
* @todo implement state transfer
* @param msg Serializable
* @return Serializable - null if no reply should be sent
public Serializable replyRequest(Serializable msg, final Member sender) {
if (! (msg instanceof MapMessage))return null;
MapMessage mapmsg = (MapMessage) msg;
//map init request
if (mapmsg.getMsgType() == mapmsg.MSG_INIT) {
return mapmsg;
//map start request
if (mapmsg.getMsgType() == mapmsg.MSG_START) {
return mapmsg;
//backup request
if (mapmsg.getMsgType() == mapmsg.MSG_RETRIEVE_BACKUP) {
MapEntry entry = (MapEntry)super.get(mapmsg.getKey());
if (entry == null || (!entry.isSerializable()) )return null;
mapmsg.setValue( (Serializable) entry.getValue());
return mapmsg;
//state transfer request
if (mapmsg.getMsgType() == mapmsg.MSG_STATE) {
synchronized (stateMutex) { //make sure we dont do two things at the same time
ArrayList list = new ArrayList();
Iterator i = super.entrySet().iterator();
while (i.hasNext()) {
Map.Entry e = (Map.Entry);
MapEntry entry = (MapEntry) e.getValue();
if ( entry.isSerializable() ) {
MapMessage me = new MapMessage(mapContextName, MapMessage.MSG_PROXY,
false, (Serializable) entry.getKey(), null, null, entry.getBackupNodes());
return mapmsg;
} //synchronized
return null;
* If the reply has already been sent to the requesting thread,
* the rpc callback can handle any data that comes in after the fact.
* @param msg Serializable
* @param sender Member
public void leftOver(Serializable msg, Member sender) {
//left over membership messages
if (! (msg instanceof MapMessage))return;
MapMessage mapmsg = (MapMessage) msg;
try {
if (mapmsg.getMsgType() == MapMessage.MSG_START) {
} else if (mapmsg.getMsgType() == MapMessage.MSG_INIT) {
} catch (IOException x ) {
log.error("Unable to deserialize MapMessage.",x);
} catch (ClassNotFoundException x ) {
log.error("Unable to deserialize MapMessage.",x);
public void messageReceived(Serializable msg, Member sender) {
if (! (msg instanceof MapMessage)) return;
MapMessage mapmsg = (MapMessage) msg;
if ( log.isTraceEnabled() ) {
log.trace("Map["+mapname+"] received message:"+mapmsg);
try {
} catch (IOException x) {
log.error("Unable to deserialize MapMessage.", x);
} catch (ClassNotFoundException x) {
log.error("Unable to deserialize MapMessage.", x);
if ( log.isTraceEnabled() )
log.trace("Map message received from:"+sender.getName()+" msg:"+mapmsg);
if (mapmsg.getMsgType() == MapMessage.MSG_START) {
if (mapmsg.getMsgType() == MapMessage.MSG_STOP) {
if (mapmsg.getMsgType() == MapMessage.MSG_PROXY) {
MapEntry entry = (MapEntry)super.get(mapmsg.getKey());
if ( entry==null ) {
entry = new MapEntry(mapmsg.getKey(), mapmsg.getValue());
super.put(entry.getKey(), entry);
} else {
if (mapmsg.getMsgType() == MapMessage.MSG_REMOVE) {
if (mapmsg.getMsgType() == MapMessage.MSG_BACKUP) {
MapEntry entry = (MapEntry)super.get(mapmsg.getKey());
if (entry == null) {
entry = new MapEntry(mapmsg.getKey(), mapmsg.getValue());
if (mapmsg.getValue()!=null && mapmsg.getValue() instanceof ReplicatedMapEntry ) {
} else {
if (entry.getValue() instanceof ReplicatedMapEntry) {
ReplicatedMapEntry diff = (ReplicatedMapEntry) entry.getValue();
if (mapmsg.isDiff()) {
try {
diff.applyDiff(mapmsg.getDiffValue(), 0, mapmsg.getDiffValue().length);
} catch (Exception x) {
log.error("Unable to apply diff to key:" + entry.getKey(), x);
} finally {
} else {
if ( mapmsg.getValue()!=null ) entry.setValue(mapmsg.getValue());
} //end if
} else if (mapmsg.getValue() instanceof ReplicatedMapEntry) {
ReplicatedMapEntry re = (ReplicatedMapEntry)mapmsg.getValue();
} else {
if ( mapmsg.getValue()!=null ) entry.setValue(mapmsg.getValue());
} //end if
} //end if
super.put(entry.getKey(), entry);
} //end if
public boolean accept(Serializable msg, Member sender) {
boolean result = false;
if (msg instanceof MapMessage) {
if ( log.isTraceEnabled() ) log.trace("Map["+mapname+"] accepting...."+msg);
result = Arrays.equals(mapContextName, ( (MapMessage) msg).getMapId());
if ( log.isTraceEnabled() ) log.trace("Msg["+mapname+"] accepted["+result+"]...."+msg);
return result;
public void mapMemberAdded(Member member) {
if ( member.equals(getChannel().getLocalMember(false)) ) return;
boolean memberAdded = false;
//select a backup node if we don't have one
synchronized (mapMembers) {
if (!mapMembers.containsKey(member) ) {
mapMembers.put(member, new Long(System.currentTimeMillis()));
memberAdded = true;
if ( memberAdded ) {
synchronized (stateMutex) {
Iterator i = super.entrySet().iterator();
while (i.hasNext()) {
Map.Entry e = (Map.Entry);
MapEntry entry = (MapEntry) e.getValue();
if ( entry == null ) continue;
if (entry.isPrimary() && (entry.getBackupNodes() == null || entry.getBackupNodes().length == 0)) {
try {
Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue());
} catch (ChannelException x) {
log.error("Unable to select backup node.", x);
} //catch
} //end if
} //while
} //synchronized
}//end if
public boolean inSet(Member m, Member[] set) {
if ( set == null ) return false;
boolean result = false;
for (int i=0; i<set.length && (!result); i++ )
if ( m.equals(set[i]) ) result = true;
return result;
public Member[] excludeFromSet(Member[] mbrs, Member[] set) {
ArrayList result = new ArrayList();
for (int i=0; i<set.length; i++ ) {
boolean include = true;
for (int j=0; j<mbrs.length; j++ )
if ( mbrs[j].equals(set[i]) ) include = false;
if ( include ) result.add(set[i]);
return (Member[])result.toArray(new Member[result.size()]);
public void memberAdded(Member member) {
//do nothing
public void memberDisappeared(Member member) {
boolean removed = false;
synchronized (mapMembers) {
removed = (mapMembers.remove(member) != null );
Iterator i = super.entrySet().iterator();
while (i.hasNext()) {
Map.Entry e = (Map.Entry);
MapEntry entry = (MapEntry) e.getValue();
if (entry.isPrimary() && inSet(member,entry.getBackupNodes())) {
try {
Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue());
} catch (ChannelException x) {
log.error("Unable to relocate[" + entry.getKey() + "] to a new backup node", x);
} //end if
} //while
public int getNextBackupIndex() {
int size = mapMembers.size();
if (mapMembers.size() == 0)return -1;
int node = currentNode++;
if (node >= size) {
node = 0;
currentNode = 0;
return node;
public Member getNextBackupNode() {
Member[] members = getMapMembers();
int node = getNextBackupIndex();
if ( members.length == 0 || node==-1) return null;
if ( node >= members.length ) node = 0;
return members[node];
protected abstract Member[] publishEntryInfo(Object key, Object value) throws ChannelException;
public void heartbeat() {
try {
}catch ( Exception x ) {
log.error("Unable to send message",x);
* Removes an object from this map, it will also remove it from
* @param key Object
* @return Object
public Object remove(Object key) {
MapEntry entry = (MapEntry)super.remove(key);
try {
MapMessage msg = new MapMessage(getMapContextName(),MapMessage.MSG_REMOVE,false,(Serializable)key,null,null,null);
getChannel().send(getMapMembers(), msg,getChannelSendOptions());
} catch ( ChannelException x ) {
log.error("Unable to replicate out data for a LazyReplicatedMap.remove operation",x);
return entry!=null?entry.getValue():null;
public Object get(Object key) {
MapEntry entry = (MapEntry)super.get(key);
if (log.isTraceEnabled()) log.trace("Requesting id:"+key+" entry:"+entry);
if ( entry == null ) return null;
if ( !entry.isPrimary() ) {
//if the message is not primary, we need to retrieve the latest value
try {
Member[] backup = null;
MapMessage msg = null;
if ( !entry.isBackup() ) {
//make sure we don't retrieve from ourselves
msg = new MapMessage(getMapContextName(), MapMessage.MSG_RETRIEVE_BACKUP, false,
(Serializable) key, null, null, null);
Response[] resp = getRpcChannel().send(entry.getBackupNodes(),msg, this.getRpcChannel().FIRST_REPLY, Channel.SEND_OPTIONS_DEFAULT, getRpcTimeout());
if (resp == null || resp.length == 0) {
//no responses
log.warn("Unable to retrieve remote object for key:" + key);
return null;
msg = (MapMessage) resp[0].getMessage();
backup = entry.getBackupNodes();
if ( entry.getValue() instanceof ReplicatedMapEntry ) {
ReplicatedMapEntry val = (ReplicatedMapEntry)entry.getValue();
if ( msg.getValue()!=null ) entry.setValue(msg.getValue());
if (entry.isBackup()) {
//select a new backup node
backup = publishEntryInfo(key, entry.getValue());
} else if ( entry.isProxy() ) {
//invalidate the previous primary
msg = new MapMessage(getMapContextName(),MapMessage.MSG_PROXY,false,(Serializable)key,null,null,backup);
Member[] dest = getMapMembersExcl(backup);
if ( dest!=null && dest.length >0) {
getChannel().send(dest, msg, getChannelSendOptions());
} catch (Exception x) {
log.error("Unable to replicate out data for a LazyReplicatedMap.get operation", x);
return null;
if (log.isTraceEnabled()) log.trace("Requesting id:"+key+" result:"+entry.getValue());
return entry.getValue();
protected void printMap(String header) {
try {
System.out.println("\nDEBUG MAP:"+header);
System.out.println("Map["+ new String(mapContextName, chset) + ", Map Size:" + super.size());
Member[] mbrs = getMapMembers();
for ( int i=0; i<mbrs.length;i++ ) {
Iterator i = super.entrySet().iterator();
int cnt = 0;
while (i.hasNext()) {
Map.Entry e = (Map.Entry);
System.out.println( (++cnt) + ". " + e.getValue());
}catch ( Exception ignore) {
* Returns true if the key has an entry in the map.
* The entry can be a proxy or a backup entry, invoking <code>get(key)</code>
* will make this entry primary for the group
* @param key Object
* @return boolean
public boolean containsKey(Object key) {
return super.containsKey(key);
public Object put(Object key, Object value) {
MapEntry entry = new MapEntry(key,value);
Object old = null;
//make sure that any old values get removed
if ( containsKey(key) ) old = remove(key);
try {
Member[] backup = publishEntryInfo(key, value);
} catch (ChannelException x) {
log.error("Unable to replicate out data for a LazyReplicatedMap.put operation", x);
return old;
* Copies all values from one map to this instance
* @param m Map
public void putAll(Map m) {
Iterator i = m.entrySet().iterator();
while ( i.hasNext() ) {
Map.Entry entry = (Map.Entry);
public void clear() {
//only delete active keys
Iterator keys = keySet().iterator();
while ( keys.hasNext() ) remove(;
public boolean containsValue(Object value) {
if ( value == null ) {
return super.containsValue(value);
} else {
Iterator i = super.entrySet().iterator();
while (i.hasNext()) {
Map.Entry e = (Map.Entry);
MapEntry entry = (MapEntry) e.getValue();
if (entry.isPrimary() && value.equals(entry.getValue())) return true;
return false;
}//end if
public Object clone() {
throw new UnsupportedOperationException("This operation is not valid on a replicated map");
* Returns the entire contents of the map
* Map.Entry.getValue() will return a LazyReplicatedMap.MapEntry object containing all the information
* about the object.
* @return Set
public Set entrySetFull() {
return super.entrySet();
public Set keySetFull() {
return super.keySet();
public int sizeFull() {
return super.size();
public Set entrySet() {
LinkedHashSet set = new LinkedHashSet(super.size());
Iterator i = super.entrySet().iterator();
while ( i.hasNext() ) {
Map.Entry e = (Map.Entry);
MapEntry entry = (MapEntry)e.getValue();
if ( entry.isPrimary() ) set.add(entry);
return Collections.unmodifiableSet(set);
public Set keySet() {
//todo implement
//should only return keys where this is active.
LinkedHashSet set = new LinkedHashSet(super.size());
Iterator i = super.entrySet().iterator();
while ( i.hasNext() ) {
Map.Entry e = (Map.Entry);
MapEntry entry = (MapEntry)e.getValue();
if ( entry.isPrimary() ) set.add(entry.getKey());
return Collections.unmodifiableSet(set);
public int size() {
//todo, implement a counter variable instead
//only count active members in this node
int counter = 0;
Object[] items = super.entrySet().toArray();
for (int i=0; i<items.length; i++ ) {
Map.Entry e = (Map.Entry) items[i];
if ( e != null ) {
MapEntry entry = (MapEntry) e.getValue();
if (entry.isPrimary() && entry.getValue() != null) counter++;
return counter;
protected boolean removeEldestEntry(Map.Entry eldest) {
return false;
public boolean isEmpty() {
return size()==0;
public Collection values() {
ArrayList values = new ArrayList(super.size());
Iterator i = super.entrySet().iterator();
while ( i.hasNext() ) {
Map.Entry e = (Map.Entry);
MapEntry entry = (MapEntry)e.getValue();
if ( entry.isPrimary() && entry.getValue()!=null) values.add(entry.getValue());
return Collections.unmodifiableCollection(values);
// Map Entry class
public static class MapEntry implements Map.Entry {
private boolean backup;
private boolean proxy;
private Member[] backupNodes;
private Object key;
private Object value;
public MapEntry(Object key, Object value) {
public boolean isKeySerializable() {
return (key == null) || (key instanceof Serializable);
public boolean isValueSerializable() {
return (value==null) || (value instanceof Serializable);
public boolean isSerializable() {
return isKeySerializable() && isValueSerializable();
public boolean isBackup() {
return backup;
public void setBackup(boolean backup) {
this.backup = backup;
public boolean isProxy() {
return proxy;
public boolean isPrimary() {
return ( (!proxy) && (!backup));
public void setProxy(boolean proxy) {
this.proxy = proxy;
public boolean isDiffable() {
return (value instanceof ReplicatedMapEntry) &&
public void setBackupNodes(Member[] nodes) {
this.backupNodes = nodes;
public Member[] getBackupNodes() {
return backupNodes;
public Object getValue() {
return value;
public Object setValue(Object value) {
Object old = this.value;
this.value = (Serializable) value;
return old;
public Object getKey() {
return key;
public Object setKey(Object key) {
Object old = this.key;
this.key = (Serializable)key;
return old;
public int hashCode() {
return key.hashCode();
public boolean equals(Object o) {
return key.equals(o);
* apply a diff, or an entire object
* @param data byte[]
* @param offset int
* @param length int
* @param diff boolean
* @throws IOException
* @throws ClassNotFoundException
public void apply(byte[] data, int offset, int length, boolean diff) throws IOException, ClassNotFoundException {
if (isDiffable() && diff) {
ReplicatedMapEntry rentry = (ReplicatedMapEntry) value;
try {
rentry.applyDiff(data, offset, length);
} finally {
} else if (length == 0) {
value = null;
proxy = true;
} else {
value = XByteBuffer.deserialize(data, offset, length);
public String toString() {
StringBuffer buf = new StringBuffer("MapEntry[key:");
buf.append(getKey()).append("; ");
buf.append("value:").append(getValue()).append("; ");
buf.append("primary:").append(isPrimary()).append("; ");
buf.append("backup:").append(isBackup()).append("; ");
return buf.toString();
// map message to send to and from other maps
public static class MapMessage implements Serializable {
public static final int MSG_BACKUP = 1;
public static final int MSG_RETRIEVE_BACKUP = 2;
public static final int MSG_PROXY = 3;
public static final int MSG_REMOVE = 4;
public static final int MSG_STATE = 5;
public static final int MSG_START = 6;
public static final int MSG_STOP = 7;
public static final int MSG_INIT = 8;
private byte[] mapId;
private int msgtype;
private boolean diff;
private transient Serializable key;
private transient Serializable value;
private byte[] valuedata;
private byte[] keydata;
private byte[] diffvalue;
private Member[] nodes;
public String toString() {
StringBuffer buf = new StringBuffer("MapMessage[context=");
buf.append(new String(mapId));
buf.append("; type=");
buf.append("; key=");
buf.append("; value=");
return buf.toString();
public String getTypeDesc() {
switch (msgtype) {
case MSG_BACKUP: return "MSG_BACKUP";
case MSG_PROXY: return "MSG_PROXY";
case MSG_REMOVE: return "MSG_REMOVE";
case MSG_STATE: return "MSG_STATE";
case MSG_START: return "MSG_START";
case MSG_STOP: return "MSG_STOP";
case MSG_INIT: return "MSG_INIT";
default : return "UNKNOWN";
public MapMessage() {}
public MapMessage(byte[] mapId,int msgtype, boolean diff,
Serializable key, Serializable value,
byte[] diffvalue, Member[] nodes) {
this.mapId = mapId;
this.msgtype = msgtype;
this.diff = diff;
this.key = key;
this.value = value;
this.diffvalue = diffvalue;
this.nodes = nodes;
public void deserialize(ClassLoader[] cls) throws IOException, ClassNotFoundException {
public int getMsgType() {
return msgtype;
public boolean isDiff() {
return diff;
public Serializable getKey() {
try {
return key(null);
} catch ( Exception x ) {
log.error("Deserialization error of the MapMessage.key",x);
return null;
public Serializable key(ClassLoader[] cls) throws IOException, ClassNotFoundException {
if ( key!=null ) return key;
if ( keydata == null || keydata.length == 0 ) return null;
key = XByteBuffer.deserialize(keydata,0,keydata.length,cls);
keydata = null;
return key;
public byte[] getKeyData() {
return keydata;
public Serializable getValue() {
try {
return value(null);
} catch ( Exception x ) {
log.error("Deserialization error of the MapMessage.value",x);
return null;
public Serializable value(ClassLoader[] cls) throws IOException, ClassNotFoundException {
if ( value!=null ) return value;
if ( valuedata == null || valuedata.length == 0 ) return null;
value = XByteBuffer.deserialize(valuedata,0,valuedata.length,cls);
valuedata = null;;
return value;
public byte[] getValueData() {
return valuedata;
public byte[] getDiffValue() {
return diffvalue;
public Member[] getBackupNodes() {
return nodes;
private void setBackUpNodes(Member[] nodes) {
this.nodes = nodes;
public byte[] getMapId() {
return mapId;
public void setValue(Serializable value) {
try {
if ( value != null ) valuedata = XByteBuffer.serialize(value);
this.value = value;
}catch ( IOException x ) {
throw new RuntimeException(x);
public void setKey(Serializable key) {
try {
if (key != null) keydata = XByteBuffer.serialize(key);
this.key = key;
} catch (IOException x) {
throw new RuntimeException(x);
protected Member[] readMembers(ObjectInput in) throws IOException, ClassNotFoundException {
int nodecount = in.readInt();
Member[] members = new Member[nodecount];
for ( int i=0; i<members.length; i++ ) {
byte[] d = new byte[in.readInt()];;
if (d.length > 0) members[i] = MemberImpl.getMember(d);
return members;
protected void writeMembers(ObjectOutput out,Member[] members) throws IOException {
if ( members == null ) members = new Member[0];
for (int i=0; i<members.length; i++ ) {
if ( members[i] != null ) {
byte[] d = members[i] != null ? ( (MemberImpl)members[i]).getData(false) : new byte[0];
* shallow clone
* @return Object
public Object clone() {
MapMessage msg = new MapMessage(this.mapId, this.msgtype, this.diff, this.key, this.value, this.diffvalue, this.nodes);
msg.keydata = this.keydata;
msg.valuedata = this.valuedata;
return msg;
} //MapMessage
public Channel getChannel() {
return channel;
public byte[] getMapContextName() {
return mapContextName;
public RpcChannel getRpcChannel() {
return rpcChannel;
public long getRpcTimeout() {
return rpcTimeout;
public Object getStateMutex() {
return stateMutex;
public boolean isStateTransferred() {
return stateTransferred;
public Object getMapOwner() {
return mapOwner;
public ClassLoader[] getExternalLoaders() {
return externalLoaders;
public int getChannelSendOptions() {
return channelSendOptions;
public long getAccessTimeout() {
return accessTimeout;
public void setMapOwner(Object mapOwner) {
this.mapOwner = mapOwner;
public void setExternalLoaders(ClassLoader[] externalLoaders) {
this.externalLoaders = externalLoaders;
public void setChannelSendOptions(int channelSendOptions) {
this.channelSendOptions = channelSendOptions;
public void setAccessTimeout(long accessTimeout) {
this.accessTimeout = accessTimeout;