| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.catalina.tribes.membership; |
| |
| import java.io.IOException; |
| import java.io.ObjectInput; |
| import java.io.ObjectOutput; |
| import java.util.Arrays; |
| |
| import org.apache.catalina.tribes.Member; |
| import org.apache.catalina.tribes.io.XByteBuffer; |
| import org.apache.catalina.tribes.transport.SenderState; |
| |
| /** |
| * A <b>membership</b> implementation using simple multicast. |
| * This is the representation of a multicast member. |
| * Carries the host, and port of the this or other cluster nodes. |
| * |
| * @author Filip Hanik |
| * @version $Revision$, $Date$ |
| */ |
| public class MemberImpl implements Member, java.io.Externalizable { |
| |
| /** |
| * Public properties specific to this implementation |
| */ |
| public static final transient String TCP_LISTEN_PORT = "tcpListenPort"; |
| public static final transient String TCP_LISTEN_HOST = "tcpListenHost"; |
| public static final transient String MEMBER_NAME = "memberName"; |
| |
| public static final transient byte[] TRIBES_MBR_BEGIN = new byte[] {84, 82, 73, 66, 69, 83, 45, 66}; |
| public static final transient byte[] TRIBES_MBR_END = new byte[] {84, 82, 73, 66, 69, 83, 45, 69}; |
| |
| /** |
| * The listen host for this member |
| */ |
| protected byte[] host; |
| protected transient String hostname; |
| /** |
| * The tcp listen port for this member |
| */ |
| protected int port; |
| |
| /** |
| * Counter for how many broadcast messages have been sent from this member |
| */ |
| protected int msgCount = 0; |
| /** |
| * The number of milliseconds since this members was |
| * created, is kept track of using the start time |
| */ |
| protected long memberAliveTime = 0; |
| |
| /** |
| * For the local member only |
| */ |
| protected transient long serviceStartTime; |
| |
| /** |
| * To avoid serialization over and over again, once the local dataPkg |
| * has been set, we use that to transmit data |
| */ |
| protected transient byte[] dataPkg = null; |
| |
| /** |
| * Unique session Id for this member |
| */ |
| protected byte[] uniqueId = new byte[16]; |
| |
| /** |
| * Custom payload that an app framework can broadcast |
| * Also used to transport stop command. |
| */ |
| protected byte[] payload = new byte[0]; |
| |
| /** |
| * Command, so that the custom payload doesn't have to be used |
| * This is for internal tribes use, such as SHUTDOWN_COMMAND |
| */ |
| protected byte[] command = new byte[0]; |
| |
| /** |
| * Domain if we want to filter based on domain. |
| */ |
| protected byte[] domain = new byte[0]; |
| |
| /** |
| * Empty constructor for serialization |
| */ |
| public MemberImpl() { |
| |
| } |
| |
| /** |
| * Construct a new member object |
| * @param name - the name of this member, cluster unique |
| * @param domain - the cluster domain name of this member |
| * @param host - the tcp listen host |
| * @param port - the tcp listen port |
| */ |
| public MemberImpl(String host, |
| int port, |
| long aliveTime) throws IOException { |
| setHostname(host); |
| this.port = port; |
| this.memberAliveTime=aliveTime; |
| } |
| |
| public MemberImpl(String host, |
| int port, |
| long aliveTime, |
| byte[] payload) throws IOException { |
| this(host,port,aliveTime); |
| setPayload(payload); |
| } |
| |
| public boolean isReady() { |
| return SenderState.getSenderState(this).isReady(); |
| } |
| public boolean isSuspect() { |
| return SenderState.getSenderState(this).isSuspect(); |
| } |
| public boolean isFailing() { |
| return SenderState.getSenderState(this).isFailing(); |
| } |
| |
| /** |
| * Increment the message count. |
| */ |
| protected void inc() { |
| msgCount++; |
| } |
| |
| /** |
| * Create a data package to send over the wire representing this member. |
| * This is faster than serialization. |
| * @return - the bytes for this member deserialized |
| * @throws Exception |
| */ |
| public byte[] getData() { |
| return getData(true); |
| } |
| /** |
| * Highly optimized version of serializing a member into a byte array |
| * Returns a cached byte[] reference, do not modify this data |
| * @param getalive boolean |
| * @return byte[] |
| */ |
| public byte[] getData(boolean getalive) { |
| return getData(getalive,false); |
| } |
| |
| |
| public int getDataLength() { |
| return TRIBES_MBR_BEGIN.length+ //start pkg |
| 4+ //data length |
| 8+ //alive time |
| 4+ //port |
| 1+ //host length |
| host.length+ //host |
| 4+ //command length |
| command.length+ //command |
| 4+ //domain length |
| domain.length+ //domain |
| 16+ //unique id |
| 4+ //payload length |
| payload.length+ //payload |
| TRIBES_MBR_END.length; //end pkg |
| } |
| |
| /** |
| * |
| * @param getalive boolean - calculate memberAlive time |
| * @param reset boolean - reset the cached data package, and create a new one |
| * @return byte[] |
| */ |
| public byte[] getData(boolean getalive, boolean reset) { |
| if ( reset ) dataPkg = null; |
| //look in cache first |
| if ( dataPkg!=null ) { |
| if ( getalive ) { |
| //you'd be surprised, but System.currentTimeMillis |
| //shows up on the profiler |
| long alive=System.currentTimeMillis()-getServiceStartTime(); |
| XByteBuffer.toBytes( (long) alive, dataPkg, TRIBES_MBR_BEGIN.length+4); |
| } |
| return dataPkg; |
| } |
| |
| //package looks like |
| //start package TRIBES_MBR_BEGIN.length |
| //package length - 4 bytes |
| //alive - 8 bytes |
| //port - 4 bytes |
| //host length - 1 byte |
| //host - hl bytes |
| //clen - 4 bytes |
| //command - clen bytes |
| //dlen - 4 bytes |
| //domain - dlen bytes |
| //uniqueId - 16 bytes |
| //payload length - 4 bytes |
| //payload plen bytes |
| //end package TRIBES_MBR_END.length |
| byte[] addr = host; |
| long alive=System.currentTimeMillis()-getServiceStartTime(); |
| byte hl = (byte)addr.length; |
| byte[] data = new byte[getDataLength()]; |
| |
| int bodylength = (getDataLength() - TRIBES_MBR_BEGIN.length - TRIBES_MBR_END.length - 4); |
| |
| int pos = 0; |
| |
| //TRIBES_MBR_BEGIN |
| System.arraycopy(TRIBES_MBR_BEGIN,0,data,pos,TRIBES_MBR_BEGIN.length); |
| pos += TRIBES_MBR_BEGIN.length; |
| |
| //body length |
| XByteBuffer.toBytes(bodylength,data,pos); |
| pos += 4; |
| |
| //alive data |
| XByteBuffer.toBytes((long)alive,data,pos); |
| pos += 8; |
| //port |
| XByteBuffer.toBytes(port,data,pos); |
| pos += 4; |
| //host length |
| data[pos++] = hl; |
| //host |
| System.arraycopy(addr,0,data,pos,addr.length); |
| pos+=addr.length; |
| //clen - 4 bytes |
| XByteBuffer.toBytes(command.length,data,pos); |
| pos+=4; |
| //command - clen bytes |
| System.arraycopy(command,0,data,pos,command.length); |
| pos+=command.length; |
| //dlen - 4 bytes |
| XByteBuffer.toBytes(domain.length,data,pos); |
| pos+=4; |
| //domain - dlen bytes |
| System.arraycopy(domain,0,data,pos,domain.length); |
| pos+=domain.length; |
| //unique Id |
| System.arraycopy(uniqueId,0,data,pos,uniqueId.length); |
| pos+=uniqueId.length; |
| //payload |
| XByteBuffer.toBytes(payload.length,data,pos); |
| pos+=4; |
| System.arraycopy(payload,0,data,pos,payload.length); |
| pos+=payload.length; |
| |
| //TRIBES_MBR_END |
| System.arraycopy(TRIBES_MBR_END,0,data,pos,TRIBES_MBR_END.length); |
| pos += TRIBES_MBR_END.length; |
| |
| //create local data |
| dataPkg = data; |
| return data; |
| } |
| /** |
| * Deserializes a member from data sent over the wire |
| * @param data - the bytes received |
| * @return a member object. |
| */ |
| public static MemberImpl getMember(byte[] data, MemberImpl member) { |
| return getMember(data,0,data.length,member); |
| } |
| |
| public static MemberImpl getMember(byte[] data, int offset, int length, MemberImpl member) { |
| //package looks like |
| //start package TRIBES_MBR_BEGIN.length |
| //package length - 4 bytes |
| //alive - 8 bytes |
| //port - 4 bytes |
| //host length - 1 byte |
| //host - hl bytes |
| //clen - 4 bytes |
| //command - clen bytes |
| //dlen - 4 bytes |
| //domain - dlen bytes |
| //uniqueId - 16 bytes |
| //payload length - 4 bytes |
| //payload plen bytes |
| //end package TRIBES_MBR_END.length |
| |
| int pos = offset; |
| |
| if (XByteBuffer.firstIndexOf(data,offset,TRIBES_MBR_BEGIN)!=pos) { |
| throw new IllegalArgumentException("Invalid package, should start with:"+org.apache.catalina.tribes.util.Arrays.toString(TRIBES_MBR_BEGIN)); |
| } |
| |
| if ( length < (TRIBES_MBR_BEGIN.length+4) ) { |
| throw new ArrayIndexOutOfBoundsException("Member package to small to validate."); |
| } |
| |
| pos += TRIBES_MBR_BEGIN.length; |
| |
| int bodylength = XByteBuffer.toInt(data,pos); |
| pos += 4; |
| |
| if ( length < (bodylength+4+TRIBES_MBR_BEGIN.length+TRIBES_MBR_END.length) ) { |
| throw new ArrayIndexOutOfBoundsException("Not enough bytes in member package."); |
| } |
| |
| int endpos = pos+bodylength; |
| if (XByteBuffer.firstIndexOf(data,endpos,TRIBES_MBR_END)!=endpos) { |
| throw new IllegalArgumentException("Invalid package, should end with:"+org.apache.catalina.tribes.util.Arrays.toString(TRIBES_MBR_END)); |
| } |
| |
| |
| byte[] alived = new byte[8]; |
| System.arraycopy(data, pos, alived, 0, 8); |
| pos += 8; |
| byte[] portd = new byte[4]; |
| System.arraycopy(data, pos, portd, 0, 4); |
| pos += 4; |
| |
| byte hl = data[pos++]; |
| byte[] addr = new byte[hl]; |
| System.arraycopy(data, pos, addr, 0, hl); |
| pos += hl; |
| |
| int cl = XByteBuffer.toInt(data, pos); |
| pos += 4; |
| |
| byte[] command = new byte[cl]; |
| System.arraycopy(data, pos, command, 0, command.length); |
| pos += command.length; |
| |
| int dl = XByteBuffer.toInt(data, pos); |
| pos += 4; |
| |
| byte[] domain = new byte[dl]; |
| System.arraycopy(data, pos, domain, 0, domain.length); |
| pos += domain.length; |
| |
| byte[] uniqueId = new byte[16]; |
| System.arraycopy(data, pos, uniqueId, 0, 16); |
| pos += 16; |
| |
| int pl = XByteBuffer.toInt(data, pos); |
| pos += 4; |
| |
| byte[] payload = new byte[pl]; |
| System.arraycopy(data, pos, payload, 0, payload.length); |
| pos += payload.length; |
| |
| member.setHost(addr); |
| member.setPort(XByteBuffer.toInt(portd, 0)); |
| member.setMemberAliveTime(XByteBuffer.toLong(alived, 0)); |
| member.setUniqueId(uniqueId); |
| member.payload = payload; |
| member.domain = domain; |
| member.command = command; |
| |
| member.dataPkg = new byte[length]; |
| System.arraycopy(data, offset, member.dataPkg, 0, length); |
| |
| return member; |
| } |
| |
| public static MemberImpl getMember(byte[] data) { |
| return getMember(data,new MemberImpl()); |
| } |
| |
| /** |
| * Return the name of this object |
| * @return a unique name to the cluster |
| */ |
| public String getName() { |
| return "tcp://"+getHostname()+":"+getPort(); |
| } |
| |
| /** |
| * Return the listen port of this member |
| * @return - tcp listen port |
| */ |
| public int getPort() { |
| return this.port; |
| } |
| |
| /** |
| * Return the TCP listen host for this member |
| * @return IP address or host name |
| */ |
| public byte[] getHost() { |
| return host; |
| } |
| |
| public String getHostname() { |
| if ( this.hostname != null ) return hostname; |
| else { |
| try { |
| this.hostname = java.net.InetAddress.getByAddress(host).getHostName(); |
| return this.hostname; |
| }catch ( IOException x ) { |
| throw new RuntimeException("Unable to parse hostname.",x); |
| } |
| } |
| } |
| |
| /** |
| * Contains information on how long this member has been online. |
| * The result is the number of milli seconds this member has been |
| * broadcasting its membership to the cluster. |
| * @return nr of milliseconds since this member started. |
| */ |
| public long getMemberAliveTime() { |
| return memberAliveTime; |
| } |
| |
| public long getServiceStartTime() { |
| return serviceStartTime; |
| } |
| |
| public byte[] getUniqueId() { |
| return uniqueId; |
| } |
| |
| public byte[] getPayload() { |
| return payload; |
| } |
| |
| public byte[] getCommand() { |
| return command; |
| } |
| |
| public byte[] getDomain() { |
| return domain; |
| } |
| |
| public void setMemberAliveTime(long time) { |
| memberAliveTime=time; |
| } |
| |
| |
| |
| /** |
| * String representation of this object |
| */ |
| public String toString() { |
| StringBuffer buf = new StringBuffer("org.apache.catalina.tribes.membership.MemberImpl["); |
| buf.append(getName()).append(","); |
| buf.append(getHostname()).append(","); |
| buf.append(port).append(", alive="); |
| buf.append(memberAliveTime).append(","); |
| buf.append("id=").append(bToS(this.uniqueId)).append(", "); |
| buf.append("payload=").append(bToS(this.payload,8)).append(", "); |
| buf.append("command=").append(bToS(this.command,8)).append(", "); |
| buf.append("domain=").append(bToS(this.domain,8)).append(", "); |
| buf.append("]"); |
| return buf.toString(); |
| } |
| public static String bToS(byte[] data) { |
| return bToS(data,data.length); |
| } |
| public static String bToS(byte[] data, int max) { |
| StringBuffer buf = new StringBuffer(4*16); |
| buf.append("{"); |
| for (int i=0; data!=null && i<data.length; i++ ) { |
| buf.append(String.valueOf(data[i])).append(" "); |
| if ( i==max ) { |
| buf.append("...("+data.length+")"); |
| break; |
| } |
| } |
| buf.append("}"); |
| return buf.toString(); |
| } |
| |
| /** |
| * @see java.lang.Object#hashCode() |
| * @return The hash code |
| */ |
| public int hashCode() { |
| return getHost()[0]+getHost()[1]+getHost()[2]+getHost()[3]; |
| } |
| |
| /** |
| * Returns true if the param o is a McastMember with the same name |
| * @param o |
| */ |
| public boolean equals(Object o) { |
| if ( o instanceof MemberImpl ) { |
| return Arrays.equals(this.getHost(),((MemberImpl)o).getHost()) && |
| this.getPort() == ((MemberImpl)o).getPort() && |
| Arrays.equals(this.getUniqueId(),((MemberImpl)o).getUniqueId()); |
| } |
| else |
| return false; |
| } |
| |
| public void setHost(byte[] host) { |
| this.host = host; |
| } |
| |
| public void setHostname(String host) throws IOException { |
| hostname = host; |
| this.host = java.net.InetAddress.getByName(host).getAddress(); |
| } |
| |
| public void setMsgCount(int msgCount) { |
| this.msgCount = msgCount; |
| } |
| |
| public void setPort(int port) { |
| this.port = port; |
| this.dataPkg = null; |
| } |
| |
| public void setServiceStartTime(long serviceStartTime) { |
| this.serviceStartTime = serviceStartTime; |
| } |
| |
| public void setUniqueId(byte[] uniqueId) { |
| this.uniqueId = uniqueId!=null?uniqueId:new byte[16]; |
| getData(true,true); |
| } |
| |
| public void setPayload(byte[] payload) { |
| byte[] oldpayload = this.payload; |
| this.payload = payload!=null?payload:new byte[0]; |
| if ( this.getData(true,true).length > McastServiceImpl.MAX_PACKET_SIZE ) { |
| this.payload = oldpayload; |
| throw new IllegalArgumentException("Payload is to large for tribes to handle."); |
| } |
| |
| } |
| |
| public void setCommand(byte[] command) { |
| this.command = command!=null?command:new byte[0]; |
| getData(true,true); |
| } |
| |
| public void setDomain(byte[] domain) { |
| this.domain = domain!=null?domain:new byte[0]; |
| getData(true,true); |
| } |
| |
| public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { |
| int length = in.readInt(); |
| byte[] message = new byte[length]; |
| in.read(message); |
| getMember(message,this); |
| |
| } |
| |
| public void writeExternal(ObjectOutput out) throws IOException { |
| byte[] data = this.getData(); |
| out.writeInt(data.length); |
| out.write(data); |
| } |
| |
| } |