| /*========================================================================= |
| * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved. |
| * This product is protected by U.S. and international copyright |
| * and intellectual property laws. Pivotal products are covered by |
| * more patents listed at http://www.pivotal.io/patents. |
| *========================================================================= |
| */ |
| package com.gemstone.gemfire.internal.cache.tier.sockets; |
| |
| import java.io.EOFException; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.OutputStream; |
| import java.net.Socket; |
| import java.net.SocketTimeoutException; |
| import java.nio.ByteBuffer; |
| import java.nio.channels.SocketChannel; |
| import java.util.concurrent.Semaphore; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.apache.logging.log4j.Logger; |
| |
| import com.gemstone.gemfire.SerializationException; |
| import com.gemstone.gemfire.internal.Assert; |
| import com.gemstone.gemfire.internal.HeapDataOutputStream; |
| import com.gemstone.gemfire.internal.SocketUtils; |
| import com.gemstone.gemfire.internal.Version; |
| import com.gemstone.gemfire.internal.cache.CachedDeserializable; |
| import com.gemstone.gemfire.internal.cache.TXManagerImpl; |
| import com.gemstone.gemfire.internal.cache.tier.MessageType; |
| import com.gemstone.gemfire.internal.i18n.LocalizedStrings; |
| import com.gemstone.gemfire.internal.logging.LogService; |
| import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; |
| import com.gemstone.gemfire.internal.offheap.StoredObject; |
| import com.gemstone.gemfire.internal.offheap.annotations.Retained; |
| import com.gemstone.gemfire.internal.offheap.annotations.Unretained; |
| import com.gemstone.gemfire.internal.util.BlobHelper; |
| |
| /** |
| * This class encapsulates the wire protocol. It provides accessors to |
| * encode and decode a message and serialize it out to the wire. |
| * |
| * <PRE> |
| * msgType - int - 4 bytes type of message, types enumerated below |
| * |
| * msgLength - int - 4 bytes total length of variable length payload |
| * |
| * numberOfParts - int - 4 bytes number of elements (LEN-BYTE* pairs) |
| * contained in the payload. Message can |
| * be a multi-part message |
| * |
| * transId - int - 4 bytes filled in by the requestor, copied back into |
| * the response |
| * |
| * earlyAck - byte- 1 byte filled in by the requestor |
| * len1 |
| * part1 |
| * . |
| * . |
| * . |
| * lenn |
| * partn |
| * </PRE> |
| * |
| * We read the fixed length 16 bytes into a byte[] and populate a bytebuffer |
| * We read the fixed length header tokens from the header |
| * parse the header and use information contained in there to read the payload. |
| * |
| * <P> |
| * |
| * See also <a href="package-summary.html#messages">package description</a>. |
| * |
| * @see com.gemstone.gemfire.internal.cache.tier.MessageType |
| * |
| * @author Sudhir Menon |
| * @since 2.0.2 |
| */ |
| public class Message { |
| |
| private static final Logger logger = LogService.getLogger(); |
| |
| @Override |
| public String toString() { |
| StringBuffer sb = new StringBuffer(); |
| sb.append("type=" + MessageType.getString(msgType)); |
| sb.append("; payloadLength=" + payloadLength); |
| sb.append("; numberOfParts=" + numberOfParts); |
| sb.append("; transactionId=" + transactionId); |
| //sb.append("; bufferLength=" + bufferLength); |
| sb.append("; currentPart=" + currentPart); |
| sb.append("; messageModified=" + messageModified); |
| sb.append("; earlyAck=" + earlyAck); |
| for (int i = 0; i < numberOfParts; i ++) { |
| sb.append("; part[" + i + "]={"); |
| sb.append(this.partsList[i].toString()); |
| sb.append("}"); |
| } |
| return sb.toString(); |
| } |
| |
| protected final static int FIXED_LENGTH = 17; |
| protected int msgType; |
| protected int payloadLength=0; |
| protected int numberOfParts =0; |
| protected int transactionId = TXManagerImpl.NOTX; |
| protected int currentPart = 0; |
| protected Part[] partsList = null; |
| protected ByteBuffer cachedCommBuffer; |
| protected Socket socket = null; |
| protected SocketChannel sockCh = null; |
| protected OutputStream os = null; |
| protected InputStream is = null; |
| protected boolean messageModified = true; |
| /** is this message a retry of a previously sent message? */ |
| protected boolean isRetry; |
| private byte earlyAck = 0x00; |
| protected MessageStats msgStats = null; |
| protected ServerConnection sc = null; |
| private int MAX_DATA = -1; |
| private Semaphore dataLimiter = null; |
| // private int MAX_MSGS = -1; |
| private Semaphore msgLimiter = null; |
| private boolean hdrRead = false; |
| private int chunkSize = 1024;//Default Chunk Size. |
| |
| protected Part securePart = null; |
| |
| // These two statics are fields shoved into the earlyAck byte for transmission. |
| // The MESSAGE_IS_RETRY bit is stripped out during deserialization but the other |
| // is left in place |
| public static final byte MESSAGE_HAS_SECURE_PART = (byte)0x02; |
| public static final byte MESSAGE_IS_RETRY = (byte)0x04; |
| |
| public static final byte MESSAGE_IS_RETRY_MASK = (byte)0xFB; |
| |
| // Tentative workaround to avoid OOM stated in #46754. |
| public static final ThreadLocal<Integer> messageType = new ThreadLocal<Integer>(); |
| |
| Version destVersion; |
| |
| /** |
| * Creates a new message with the given number of parts |
| */ |
| public Message(int numberOfParts, Version destVersion) { |
| this.destVersion = destVersion; |
| Assert.assertTrue(destVersion != null, "Attempt to create an unversioned message"); |
| partsList = new Part[numberOfParts]; |
| this.numberOfParts = numberOfParts; |
| for (int i=0;i<partsList.length;i++) { |
| partsList[i] = new Part(); |
| } |
| } |
| |
| public boolean isSecureMode() { |
| return securePart != null; |
| } |
| |
| public byte[] getSecureBytes() |
| throws IOException, ClassNotFoundException { |
| return (byte[])this.securePart.getObject(); |
| } |
| |
| public void setMessageType(int msgType) { |
| this.messageModified = true; |
| if (!MessageType.validate(msgType)) { |
| throw new IllegalArgumentException(LocalizedStrings.Message_INVALID_MESSAGETYPE.toLocalizedString()); |
| } |
| this.msgType = msgType; |
| } |
| |
| public void setVersion(Version clientVersion) { |
| this.destVersion = clientVersion; |
| } |
| |
| /** |
| * Sets whether this message is early-ack |
| * @param earlyAck whether this message is early-ack |
| */ |
| public void setEarlyAck(boolean earlyAck) { |
| if (earlyAck) { |
| this.earlyAck = 0x01; |
| } else { |
| this.earlyAck = 0x00; |
| } |
| } |
| |
| // TODO (ashetkar) To be removed later. |
| public void setEarlyAck(byte earlyAck) { |
| // Check that the passed in value is within the acceptable range. |
| if (0x00 <= earlyAck && earlyAck <= 0x02) { |
| this.earlyAck |= earlyAck; |
| } |
| } |
| |
| /* |
| * public void setPayloadLength(int payloadLength) { |
| this.payloadLength = payloadLength; |
| }*/ |
| |
| /** |
| * Sets and builds the {@link Part}s that are sent |
| * in the payload of the Message |
| * @param numberOfParts |
| */ |
| public void setNumberOfParts(int numberOfParts) { |
| //TODO:hitesh need to add security header here from server |
| //need to insure it is not chunked message |
| //should we look message type to avoid internal message like ping |
| this.messageModified = true; |
| this.currentPart=0; |
| this.numberOfParts = numberOfParts; |
| if (numberOfParts > this.partsList.length) { |
| Part[] newPartsList = new Part[numberOfParts]; |
| for (int i=0;i<numberOfParts;i++) { |
| if (i < this.partsList.length) { |
| newPartsList[i] = this.partsList[i]; |
| } else { |
| newPartsList[i] = new Part(); |
| } |
| } |
| this.partsList = newPartsList; |
| } |
| } |
| |
| public void setTransactionId(int transactionId) { |
| this.messageModified = true; |
| this.transactionId = transactionId; |
| } |
| |
| public void setIsRetry() { |
| this.isRetry = true; |
| } |
| |
| /** |
| * This returns true if the message has been marked as having been previously |
| * transmitted to a different server. |
| */ |
| public boolean isRetry() { |
| return this.isRetry; |
| } |
| |
| /*Sets size for HDOS chunk.*/ |
| public void setChunkSize(int chunkSize) { |
| this.chunkSize = chunkSize; |
| } |
| |
| public void addStringPart(String str) { |
| if (str==null) { |
| addRawPart((byte[])null, false); |
| } |
| else { |
| HeapDataOutputStream hdos = new HeapDataOutputStream(str); |
| this.messageModified = true; |
| Part part = partsList[this.currentPart]; |
| part.setPartState(hdos, false); |
| this.currentPart++; |
| } |
| } |
| |
| /** |
| * Sets whether or not a |
| * <code>DataOutputStream</code>/<code>DataOutputStream</code> |
| * should be used to send/receive data. |
| public void setUseDataStream (boolean useDataStream) { |
| this.useDataStream = useDataStream; |
| } |
| */ |
| |
| /* |
| * Adds a new part to this message that contains a <code>byte</code> |
| * array (as opposed to a serialized object). |
| * |
| * @see #addPart(byte[], boolean) |
| */ |
| public void addBytesPart(byte[] newPart) { |
| addRawPart(newPart, false); |
| } |
| |
| public void addStringOrObjPart(Object o) { |
| if (o instanceof String || o == null) { |
| addStringPart((String)o); |
| } else { |
| // Note even if o is a byte[] we need to serialize it. |
| // This could be cleaned up but it would require C client code to change. |
| serializeAndAddPart(o, false); |
| } |
| } |
| |
| public void addDeltaPart(HeapDataOutputStream hdos) { // TODO: Amogh- Should it be just DataOutput? |
| this.messageModified = true; |
| Part part = partsList[this.currentPart]; |
| part.setPartState(hdos, false); |
| this.currentPart++; |
| } |
| |
| public void addObjPart(Object o) { |
| addObjPart(o, false); |
| } |
| /** |
| * Like addObjPart(Object) but also prefers to reference |
| * objects in the part instead of copying them into a byte buffer. |
| */ |
| public void addObjPartNoCopying(Object o) { |
| if (o == null || o instanceof byte[]) { |
| addRawPart((byte[])o, false); |
| } else { |
| serializeAndAddPartNoCopying(o); |
| } |
| } |
| public void addObjPart(Object o, boolean zipValues) { |
| if (o == null || o instanceof byte[]) { |
| addRawPart((byte[])o, false); |
| } else { |
| serializeAndAddPart(o, zipValues); |
| } |
| } |
| public void addPartInAnyForm(@Unretained Object o, boolean isObject) { |
| if (o == null) { |
| addRawPart((byte[])o, false); |
| } else if (o instanceof byte[]) { |
| addRawPart((byte[])o, isObject); |
| } else if (o instanceof StoredObject) { |
| // It is possible it is an off-heap StoredObject that contains a simple non-object byte[]. |
| this.messageModified = true; |
| Part part = partsList[this.currentPart]; |
| part.setPartState((StoredObject)o, isObject); |
| this.currentPart++; |
| } else { |
| serializeAndAddPart(o, false); |
| } |
| } |
| |
| private void serializeAndAddPartNoCopying(Object o) { |
| HeapDataOutputStream hdos; |
| Version v = destVersion; |
| if (destVersion.equals(Version.CURRENT)){ |
| v = null; |
| } |
| // create the HDOS with a flag telling it that it can keep any byte[] or ByteBuffers/ByteSources passed to it. |
| hdos = new HeapDataOutputStream(chunkSize, v, true); |
| // TODO OFFHEAP: Change Part to look for an HDOS and just pass a reference to its DirectByteBuffer. |
| // Then change HDOS sendTo(SocketChannel...) to use the GatheringByteChannel to write a bunch of bbs. |
| // TODO OFFHEAP This code optimizes one part which works pretty good for getAll since all the values are |
| // returned in one part. But the following seems even better... |
| // BETTER: change Message to consolidate all the part hdos bb lists into a single bb array and have it do the GatheringByteChannel write. |
| // Message can use slice for the small parts (msg header and part header) that are not in the parts data (its a byte array, Chunk, or HDOS). |
| // EVEN BETTER: the message can have a single HDOS which owns a direct comm buffer. It can reserve space if it does not yet know the value to write (for example the size of the message or part). |
| // If we write something to the HDOS that is direct then it does not need to be copied. |
| // But large heap byte arrays will need to be copied to the hdos (the socket write does this anyway). |
| // If the direct buffer is full then we can allocate another one. If a part is already in a heap byte array |
| // then we could defer copying it by slicing the current direct bb and then adding the heap byte array |
| // as bb using ByteBuffer.wrap. Once we have all the data in the HDOS we can finally generate the header |
| // and then start working on sending the ByteBuffers to the channel. If we have room in a direct bb then |
| // we can copy a heap bb to it. Otherwise we can write the bb ahead of it which would free up room to copy |
| // the heap bb to the existing direct bb without needing to allocate extra direct bbs. |
| // Delaying the flush uses more direct memory but reduces the number of system calls. |
| try { |
| // logger.fine("hitesh before serializatino: " ); |
| // |
| // if (o != null ){ |
| // logger.fine("hitesh before serializatino: " + o.toString()); |
| // logger.fine("hitesh before serializatino: " + o.getClass().getName()); |
| // } |
| BlobHelper.serializeTo(o, hdos); |
| } catch (IOException ex) { |
| throw new SerializationException("failed serializing object", ex); |
| } |
| this.messageModified = true; |
| Part part = partsList[this.currentPart]; |
| part.setPartState(hdos, true); |
| this.currentPart++; |
| |
| } |
| |
| private void serializeAndAddPart(Object o, boolean zipValues) { |
| if (zipValues) { |
| throw new UnsupportedOperationException("zipValues no longer supported"); |
| |
| // byte[] b = CacheServerHelper.serialize(o, zipValues); |
| // addRawPart(b, true); |
| } else { |
| HeapDataOutputStream hdos; |
| Version v = destVersion; |
| if (destVersion.equals(Version.CURRENT)){ |
| v = null; |
| } |
| hdos = new HeapDataOutputStream(chunkSize, v); |
| try { |
| // logger.fine("hitesh before serializatino: " ); |
| // |
| // if (o != null ){ |
| // logger.fine("hitesh before serializatino: " + o.toString()); |
| // logger.fine("hitesh before serializatino: " + o.getClass().getName()); |
| // } |
| BlobHelper.serializeTo(o, hdos); |
| } catch (IOException ex) { |
| throw new SerializationException("failed serializing object", ex); |
| } |
| this.messageModified = true; |
| Part part = partsList[this.currentPart]; |
| part.setPartState(hdos, true); |
| this.currentPart++; |
| } |
| } |
| |
| public void addIntPart(int v) { |
| this.messageModified = true; |
| Part part = partsList[this.currentPart]; |
| part.setInt(v); |
| this.currentPart++; |
| } |
| |
| public void addLongPart(long v) { |
| this.messageModified = true; |
| Part part = partsList[this.currentPart]; |
| part.setLong(v); |
| this.currentPart++; |
| } |
| |
| /** |
| * Adds a new part to this message that may contain a serialized |
| * object. |
| */ |
| public void addRawPart(byte[] newPart,boolean isObject) { |
| this.messageModified = true; |
| Part part = partsList[this.currentPart]; |
| part.setPartState(newPart, isObject); |
| this.currentPart++; |
| } |
| |
| public int getMessageType() { |
| return this.msgType; |
| } |
| |
| public int getPayloadLength() { |
| return this.payloadLength; |
| } |
| |
| public int getHeaderLength() { |
| return FIXED_LENGTH; |
| } |
| |
| public int getNumberOfParts() { |
| return this.numberOfParts; |
| } |
| |
| public int getTransactionId() { |
| return this.transactionId; |
| } |
| |
| public Part getPart(int index) { |
| if (index < this.numberOfParts) |
| return partsList[index]; |
| return null; |
| } |
| |
| public boolean getEarlyAck() { |
| return this.earlyAck == 0x01 ? true: false; |
| } |
| |
| // TODO (ashetkar) To be removed |
| public byte getEarlyAckByte() { |
| return this.earlyAck; |
| } |
| |
| private static ThreadLocal tlCommBuffer = new ThreadLocal(); |
| |
| public static ByteBuffer setTLCommBuffer(ByteBuffer bb) { |
| ByteBuffer result = (ByteBuffer)tlCommBuffer.get(); |
| tlCommBuffer.set(bb); |
| return result; |
| } |
| |
| public ByteBuffer getCommBuffer() { |
| if (this.cachedCommBuffer != null) { |
| return this.cachedCommBuffer; |
| } |
| else { |
| return (ByteBuffer)tlCommBuffer.get(); |
| } |
| } |
| |
| public void clear() { |
| this.isRetry = false; |
| int len = this.payloadLength; |
| if (len != 0) { |
| this.payloadLength = 0; |
| } |
| if (this.hdrRead) { |
| if (this.msgStats != null) { |
| this.msgStats.decMessagesBeingReceived(len); |
| } |
| } |
| if (this.socket != null) { |
| getCommBuffer().clear(); |
| } |
| clearParts(); |
| if (len != 0 && this.dataLimiter != null) { |
| this.dataLimiter.release(len); |
| this.dataLimiter = null; |
| this.MAX_DATA = 0; |
| } |
| if (this.hdrRead) { |
| if (this.msgLimiter != null) { |
| this.msgLimiter.release(1); |
| this.msgLimiter = null; |
| } |
| this.hdrRead = false; |
| } |
| } |
| |
| protected void packHeaderInfoForSending(int msgLen, boolean isSecurityHeader) { |
| //TODO:hitesh setting second bit of early ack for client |
| //this is not require but this makes all changes easily at client side right now |
| //just see this bit and process security header |
| byte eAck = this.earlyAck; |
| if (isSecurityHeader) { |
| eAck |= MESSAGE_HAS_SECURE_PART; |
| } |
| if (this.isRetry) { |
| eAck |= MESSAGE_IS_RETRY; |
| } |
| getCommBuffer() |
| .putInt(this.msgType) |
| .putInt(msgLen) |
| .putInt(this.numberOfParts) |
| .putInt(this.transactionId) |
| .put(eAck); |
| } |
| |
| private static final int PART_HEADER_SIZE = 5; // 4 bytes for length, 1 byte for isObject |
| |
| protected Part getSecurityPart() { |
| if (this.sc != null ) { |
| //look types right put get etc |
| return this.sc.updateAndGetSecurityPart(); |
| } |
| return null; |
| } |
| |
| public void setSecurePart(byte[] bytes) { |
| this.securePart = new Part(); |
| this.securePart.setPartState(bytes, false); |
| } |
| |
| private boolean m_isMetaRegion = false; |
| |
| public void setMetaRegion(boolean isMetaRegion) { |
| this.m_isMetaRegion = isMetaRegion; |
| } |
| |
| public boolean getAndResetIsMetaRegion() { |
| boolean isMetaRegion = this.m_isMetaRegion; |
| this.m_isMetaRegion = false; |
| return isMetaRegion; |
| } |
| |
| /** |
| * Sends this message out on its socket. |
| */ |
| protected void sendBytes(boolean clearMessage) throws IOException { |
| if (this.sc != null) { |
| // Keep track of the fact that we are making progress. |
| this.sc.updateProcessingMessage(); |
| } |
| if (this.socket != null) { |
| final ByteBuffer cb = getCommBuffer(); |
| if (cb == null) { |
| throw new IOException("No buffer"); |
| } |
| synchronized(cb) { |
| int numOfSecureParts = 0; |
| Part securityPart = this.getSecurityPart(); |
| boolean isSecurityHeader = false; |
| |
| if (securityPart != null) { |
| isSecurityHeader = true; |
| numOfSecureParts = 1; |
| } |
| else if (this.securePart != null) { |
| // This is a client sending this message. |
| securityPart = this.securePart; |
| isSecurityHeader = true; |
| numOfSecureParts = 1; |
| } |
| |
| //this.logger.fine("hitesh sendbytes forServer_SecurityPart " + numOfSecureParts); |
| int totalPartLen = 0; |
| for (int i=0;i<this.numberOfParts;i++){ |
| Part part = this.partsList[i]; |
| totalPartLen += part.getLength(); |
| } |
| |
| if(numOfSecureParts == 1) { |
| totalPartLen += securityPart.getLength(); |
| } |
| int msgLen = (PART_HEADER_SIZE * (this.numberOfParts + numOfSecureParts)) + totalPartLen; |
| cb.clear(); |
| packHeaderInfoForSending(msgLen, isSecurityHeader); |
| for (int i=0;i<this.numberOfParts + numOfSecureParts;i++) { |
| Part part = null; |
| if(i == this.numberOfParts) { |
| part = securityPart; |
| } |
| else { |
| part = partsList[i]; |
| } |
| if (cb.remaining() < PART_HEADER_SIZE) { |
| flushBuffer(); |
| } |
| int partLen = part.getLength(); |
| cb.putInt(partLen); |
| cb.put(part.getTypeCode()); |
| if (partLen <= cb.remaining()) { |
| part.sendTo(cb); |
| } else { |
| flushBuffer(); |
| // send partBytes |
| if (this.sockCh != null) { |
| part.sendTo(this.sockCh, cb); |
| } else { |
| part.sendTo(this.os, cb); |
| } |
| if (this.msgStats != null) { |
| this.msgStats.incSentBytes(partLen); |
| } |
| } |
| } |
| if (cb.position() != 0) { |
| flushBuffer(); |
| } |
| this.messageModified = false; |
| if (this.sockCh == null) { |
| this.os.flush(); |
| } |
| } |
| if(clearMessage) { |
| clearParts(); |
| } |
| } |
| else { |
| throw new IOException(LocalizedStrings.Message_DEAD_CONNECTION.toLocalizedString()); |
| } |
| } |
| |
| protected void flushBuffer() throws IOException { |
| final ByteBuffer cb = getCommBuffer(); |
| if (this.sockCh != null) { |
| cb.flip(); |
| do { |
| this.sockCh.write(cb); |
| } while (cb.remaining() > 0); |
| } else { |
| this.os.write(cb.array(), 0, cb.position()); |
| } |
| if (this.msgStats != null) { |
| this.msgStats.incSentBytes(cb.position()); |
| } |
| cb.clear(); |
| } |
| |
| private void read() |
| throws IOException { |
| clearParts(); |
| //TODO:Hitesh ??? for server changes make sure sc is not null as this class also used by client :( |
| readHeaderAndPayload(); |
| } |
| |
| /** |
| * Read the actual bytes of the header off the socket |
| */ |
| protected final void fetchHeader() throws IOException { |
| final ByteBuffer cb = getCommBuffer(); |
| cb.clear(); |
| // msgType is invalidated here and can be used as an indicator |
| // of problems reading the message |
| this.msgType = MessageType.INVALID; |
| |
| int hdr = 0; |
| |
| final int headerLength = getHeaderLength(); |
| if (this.sockCh != null) { |
| cb.limit(headerLength); |
| do { |
| int bytesRead = this.sockCh.read(cb); |
| //System.out.println("DEBUG: fetchHeader read " + bytesRead + " bytes commBuffer=" + cb); |
| if (bytesRead == -1) { |
| throw new EOFException(LocalizedStrings.Message_THE_CONNECTION_HAS_BEEN_RESET_WHILE_READING_THE_HEADER.toLocalizedString()); |
| } |
| if (this.msgStats != null) { |
| this.msgStats.incReceivedBytes(bytesRead); |
| } |
| } while (cb.remaining() > 0); |
| cb.flip(); |
| } else { |
| do { |
| int bytesRead = -1; |
| try { |
| bytesRead = this.is.read(cb.array(),hdr, headerLength-hdr); |
| } |
| catch (SocketTimeoutException e) { |
| // bytesRead = 0; |
| // TODO add a cancellation check |
| throw e; |
| } |
| if (bytesRead == -1) { |
| throw new EOFException(LocalizedStrings.Message_THE_CONNECTION_HAS_BEEN_RESET_WHILE_READING_THE_HEADER.toLocalizedString()); |
| } |
| hdr += bytesRead; |
| if (this.msgStats != null) { |
| this.msgStats.incReceivedBytes(bytesRead); |
| } |
| } while (hdr < headerLength); |
| |
| // now setup the commBuffer for the caller to parse it |
| cb.rewind(); |
| } |
| } |
| |
| private void readHeaderAndPayload() |
| throws IOException { |
| //TODO:Hitesh ??? |
| fetchHeader(); |
| final ByteBuffer cb = getCommBuffer(); |
| final int type = cb.getInt(); |
| final int len = cb.getInt(); |
| final int numParts = cb.getInt(); |
| final int txid = cb.getInt(); |
| byte early = cb.get(); |
| cb.clear(); |
| |
| if (!MessageType.validate(type)) { |
| throw new IOException(LocalizedStrings.Message_INVALID_MESSAGE_TYPE_0_WHILE_READING_HEADER.toLocalizedString(Integer.valueOf(type))); |
| } |
| int timeToWait = 0; |
| if (this.sc != null) { |
| // Keep track of the fact that a message is being processed. |
| this.sc.setProcessingMessage(); |
| timeToWait = sc.getClientReadTimeout(); |
| } |
| this.hdrRead = true; |
| if (this.msgLimiter != null) { |
| for (;;) { |
| this.sc.getCachedRegionHelper().checkCancelInProgress(null); |
| boolean interrupted = Thread.interrupted(); |
| try { |
| if (timeToWait == 0) { |
| this.msgLimiter.acquire(1); |
| } |
| else { |
| if (!this.msgLimiter.tryAcquire(1, timeToWait, TimeUnit.MILLISECONDS)) { |
| if (this.msgStats != null |
| && this.msgStats instanceof CacheServerStats) { |
| ((CacheServerStats)this.msgStats).incConnectionsTimedOut(); |
| } |
| throw new IOException(LocalizedStrings.Message_OPERATION_TIMED_OUT_ON_SERVER_WAITING_ON_CONCURRENT_MESSAGE_LIMITER_AFTER_WAITING_0_MILLISECONDS.toLocalizedString(Integer.valueOf(timeToWait))); |
| } |
| } |
| break; |
| } |
| catch (InterruptedException e) { |
| interrupted = true; |
| } |
| finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } // for |
| } |
| if (len > 0) { |
| if (this.MAX_DATA > 0 && len > this.MAX_DATA) { |
| throw new IOException(LocalizedStrings.Message_MESSAGE_SIZE_0_EXCEEDED_MAX_LIMIT_OF_1.toLocalizedString(new Object[] {Integer.valueOf(len), Integer.valueOf(this.MAX_DATA)})); |
| } |
| if (this.dataLimiter != null) { |
| for (;;) { |
| if (sc != null) { |
| this.sc.getCachedRegionHelper().checkCancelInProgress(null); |
| } |
| boolean interrupted = Thread.interrupted(); |
| try { |
| if (timeToWait == 0) { |
| this.dataLimiter.acquire(len); |
| } |
| else { |
| int newTimeToWait = timeToWait; |
| if (this.msgLimiter != null) { |
| // may have waited for msg limit so recalc time to wait |
| newTimeToWait -= (int)sc.getCurrentMessageProcessingTime(); |
| } |
| if (newTimeToWait <= 0 || !this.msgLimiter.tryAcquire(1, newTimeToWait, TimeUnit.MILLISECONDS)) { |
| throw new IOException(LocalizedStrings.Message_OPERATION_TIMED_OUT_ON_SERVER_WAITING_ON_CONCURRENT_DATA_LIMITER_AFTER_WAITING_0_MILLISECONDS.toLocalizedString(timeToWait)); |
| } |
| } |
| this.payloadLength = len; // makes sure payloadLength gets set now so we will release the semaphore |
| break; // success |
| } |
| catch (InterruptedException e) { |
| interrupted = true; |
| } |
| finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| } |
| } |
| if (this.msgStats != null) { |
| this.msgStats.incMessagesBeingReceived(len); |
| this.payloadLength = len; // makes sure payloadLength gets set now so we will dec on clear |
| } |
| |
| this.isRetry = (early & MESSAGE_IS_RETRY) != 0; |
| early = (byte)(early & MESSAGE_IS_RETRY_MASK); |
| |
| //TODO:hitesh it was below ?? |
| this.earlyAck = early; |
| this.msgType = type; |
| //this.logger.fine("Before reading message parts, earlyAck already read as " + this.earlyAck); |
| readPayloadFields(numParts, len); |
| |
| // Set the header and payload fields only after receiving all the |
| // socket data, providing better message consistency in the face |
| // of exceptional conditions (e.g. IO problems, timeouts etc.) |
| this.msgType = type; |
| this.payloadLength = len; |
| // this.numberOfParts = numParts; Already set in setPayloadFields via setNumberOfParts |
| this.transactionId = txid; |
| this.earlyAck = early; |
| if (this.sc != null) { |
| // Keep track of the fact that a message is being processed. |
| this.sc.updateProcessingMessage(); |
| } |
| } |
| |
| // static final int MAX_PART_BUFFERS = 2; |
| // static final int MIN_PART_BUFFER_SIZE = 999; |
| // static final int MAX_PART_BUFFER_SIZE = 1024*1024*11; |
| // static ArrayList partBuffers = new ArrayList(2); |
| // static int partBufferIdx = 0; |
| // static { |
| // for (int i=0; i < MAX_PART_BUFFERS; i++) { |
| // partBuffers.add(i, null); |
| // } |
| // } |
| |
| // private static synchronized byte[] getPartBuffer(int size) { |
| // byte[] result; |
| // synchronized (partBuffers) { |
| // result = (byte[])partBuffers.get(partBufferIdx); |
| // if (result == null) { |
| // result = new byte[size]; |
| // partBuffers.add(partBufferIdx, result); |
| // } else if (result.length != size) { |
| // // can't use a cached one |
| // return null; |
| // } |
| // partBufferIdx++; |
| // if (partBufferIdx >= MAX_PART_BUFFERS) { |
| // partBufferIdx = 0; |
| // } |
| // } |
| // return result; |
| // } |
| |
| protected void readPayloadFields(final int numParts, final int len) |
| throws IOException { |
| //TODO:Hitesh |
| if (len > 0 && numParts <= 0 || |
| len <= 0 && numParts > 0) { |
| throw new IOException(LocalizedStrings.Message_PART_LENGTH_0_AND_NUMBER_OF_PARTS_1_INCONSISTENT.toLocalizedString( |
| new Object[] {Integer.valueOf(len), Integer.valueOf(numParts)})); |
| } |
| |
| Integer msgType = messageType.get(); |
| if (msgType != null && msgType == MessageType.PING) { |
| messageType.set(null); // set it to null right away. |
| int pingParts = 10; // Some number which will not throw OOM but still be acceptable for a ping operation. |
| if (numParts > pingParts) { |
| throw new IOException("Part length ( " + numParts |
| + " ) is inconsistent for " + MessageType.getString(msgType) |
| + " operation."); |
| } |
| } |
| setNumberOfParts(numParts); |
| if (numParts <= 0) |
| return; |
| |
| if (len < 0) { |
| logger.info(LocalizedMessage.create(LocalizedStrings.Message_RPL_NEG_LEN__0, len)); |
| throw new IOException(LocalizedStrings.Message_DEAD_CONNECTION.toLocalizedString()); |
| } |
| |
| final ByteBuffer cb = getCommBuffer(); |
| cb.clear(); |
| cb.flip(); |
| |
| int readSecurePart = 0; |
| //TODO:Hitesh look if securePart can be cached here |
| //this.logger.fine("readPayloadFields() early ack = " + this.earlyAck); |
| readSecurePart = checkAndSetSecurityPart(); |
| |
| int bytesRemaining = len; |
| //this.logger.fine("readPayloadFields() : numParts=" + numParts + " len=" + len); |
| for (int i = 0; ((i < numParts + readSecurePart) || ((readSecurePart == 1) && (cb |
| .remaining() > 0))); i++) { |
| int bytesReadThisTime = readPartChunk(bytesRemaining); |
| bytesRemaining -= bytesReadThisTime; |
| |
| Part part; |
| |
| if(i < numParts) { |
| part = this.partsList[i]; |
| } |
| else { |
| part = this.securePart; |
| } |
| |
| int partLen = cb.getInt(); |
| byte partType = cb.get(); |
| byte[] partBytes = null; |
| // this.logger.fine("readPayloadFields(): partLen=" + partLen + " partType=" + partType); |
| if (partLen > 0) { |
| // if (partLen >= MIN_PART_BUFFER_SIZE && partLen <= MAX_PART_BUFFER_SIZE) { |
| // partBytes = getPartBuffer(partLen); |
| // } |
| // if (partBytes == null) { |
| partBytes = new byte[partLen]; |
| // } |
| int alreadyReadBytes = cb.remaining(); |
| if (alreadyReadBytes > 0) { |
| if (partLen < alreadyReadBytes) { |
| alreadyReadBytes = partLen; |
| } |
| cb.get(partBytes, 0, alreadyReadBytes); |
| } |
| // now we need to read partLen - alreadyReadBytes off the wire |
| int off = alreadyReadBytes; |
| int remaining = partLen - off; |
| while (remaining > 0) { |
| if (this.sockCh != null) { |
| int bytesThisTime = remaining; |
| cb.clear(); |
| if (bytesThisTime > cb.capacity()) { |
| bytesThisTime = cb.capacity(); |
| } |
| cb.limit(bytesThisTime); |
| int res = this.sockCh.read(cb); |
| //System.out.println("DEBUG: part read " + res + " bytes commBuffer=" + cb); |
| if (res != -1) { |
| cb.flip(); |
| bytesRemaining -= res; |
| remaining -= res; |
| cb.get(partBytes, off, res); |
| off += res; |
| if (this.msgStats != null) { |
| this.msgStats.incReceivedBytes(res); |
| } |
| } else { |
| throw new EOFException(LocalizedStrings.Message_THE_CONNECTION_HAS_BEEN_RESET_WHILE_READING_A_PART.toLocalizedString()); |
| } |
| } else { |
| int res = 0; |
| try { |
| res = this.is.read(partBytes, off, remaining); |
| } |
| catch (SocketTimeoutException e) { |
| // res = 0; |
| // TODO: add cancellation check |
| throw e; |
| } |
| if (res != -1) { |
| bytesRemaining -= res; |
| remaining -= res; |
| off += res; |
| if (this.msgStats != null) { |
| this.msgStats.incReceivedBytes(res); |
| } |
| } else { |
| throw new EOFException(LocalizedStrings.Message_THE_CONNECTION_HAS_BEEN_RESET_WHILE_READING_A_PART.toLocalizedString()); |
| } |
| } |
| } |
| } |
| part.init(partBytes, partType); |
| } |
| } |
| |
| protected int checkAndSetSecurityPart() { |
| if ((this.earlyAck | MESSAGE_HAS_SECURE_PART) == this.earlyAck) { |
| this.securePart = new Part(); |
| return 1; |
| } |
| else { |
| this.securePart = null; |
| return 0; |
| } |
| } |
| |
| /** |
| * @param bytesRemaining the most bytes we can read |
| * @return the number of bytes read into commBuffer |
| */ |
| private int readPartChunk(int bytesRemaining) throws IOException { |
| final ByteBuffer cb = getCommBuffer(); |
| //this.logger.info("DEBUG: commBuffer.remaining=" + cb.remaining()); |
| if (cb.remaining() >= PART_HEADER_SIZE) { |
| // we already have the next part header in commBuffer so just return |
| return 0; |
| } |
| if (cb.position() != 0) { |
| cb.compact(); |
| } else { |
| cb.position(cb.limit()); |
| cb.limit(cb.capacity()); |
| } |
| int bytesRead = 0; |
| if (this.sc != null) { |
| // Keep track of the fact that we are making progress |
| this.sc.updateProcessingMessage(); |
| } |
| if (this.sockCh != null) { |
| int remaining = cb.remaining(); |
| if (remaining > bytesRemaining) { |
| remaining = bytesRemaining; |
| cb.limit(cb.position()+bytesRemaining); |
| } |
| while (remaining > 0) { |
| int res = this.sockCh.read(cb); |
| //System.out.println("DEBUG: partChunk read " + res + " bytes commBuffer=" + cb); |
| if (res != -1) { |
| remaining -= res; |
| bytesRead += res; |
| if (this.msgStats != null) { |
| this.msgStats.incReceivedBytes(res); |
| } |
| } else { |
| throw new EOFException(LocalizedStrings.Message_THE_CONNECTION_HAS_BEEN_RESET_WHILE_READING_THE_PAYLOAD.toLocalizedString()); |
| } |
| } |
| |
| } else { |
| int bufSpace = cb.capacity() - cb.position(); |
| int bytesToRead = bufSpace; |
| if (bytesRemaining < bytesToRead) { |
| bytesToRead = bytesRemaining; |
| } |
| int pos = cb.position(); |
| while (bytesToRead > 0) { |
| int res = 0; |
| try { |
| res = this.is.read(cb.array(), pos, bytesToRead); |
| } |
| catch (SocketTimeoutException e) { |
| // res = 0; |
| // TODO add a cancellation check |
| throw e; |
| } |
| if (res != -1) { |
| bytesToRead -= res; |
| pos += res; |
| bytesRead += res; |
| if (this.msgStats != null) { |
| this.msgStats.incReceivedBytes(res); |
| } |
| } else { |
| throw new EOFException(LocalizedStrings.Message_THE_CONNECTION_HAS_BEEN_RESET_WHILE_READING_THE_PAYLOAD.toLocalizedString()); |
| } |
| } |
| cb.position(pos); |
| } |
| cb.flip(); |
| return bytesRead; |
| } |
| |
| /** |
| * Gets rid of all the parts that have been added to this message. |
| */ |
| public void clearParts() { |
| for (int i=0; i< partsList.length; i++){ |
| partsList[i].clear(); |
| } |
| this.currentPart=0; |
| } |
| public void setComms(ServerConnection sc, Socket socket, ByteBuffer bb, MessageStats msgStats) throws IOException { |
| this.sc = sc; |
| setComms(socket, bb, msgStats); |
| } |
| |
| public void setComms(Socket socket, ByteBuffer bb, MessageStats msgStats) throws IOException { |
| this.sockCh = socket.getChannel(); |
| if (this.sockCh == null) { |
| setComms(socket, SocketUtils.getInputStream(socket), SocketUtils.getOutputStream(socket), bb, msgStats); |
| } else { |
| setComms(socket, null, null, bb, msgStats); |
| } |
| } |
| |
| public void setComms(Socket socket, InputStream is, OutputStream os, ByteBuffer bb, MessageStats msgStats) |
| throws IOException |
| { |
| Assert.assertTrue(socket != null); |
| this.socket = socket; |
| this.sockCh = socket.getChannel(); |
| this.is = is; |
| this.os = os; |
| this.cachedCommBuffer = bb; |
| this.msgStats = msgStats; |
| } |
| /** |
| * Undo any state changes done by setComms. |
| * @since 5.7 |
| */ |
| public void unsetComms() { |
| this.socket = null; |
| this.sockCh = null; |
| this.is = null; |
| this.os = null; |
| this.cachedCommBuffer = null; |
| this.msgStats = null; |
| } |
| |
| /** |
| * Sends this message to its receiver over its |
| * setOutputStream?? output stream. |
| */ |
| public void send() |
| throws IOException { |
| send(true); |
| } |
| |
| public void send(ServerConnection servConn) |
| throws IOException { |
| if (this.sc != servConn) throw new IllegalStateException("this.sc was not correctly set"); |
| send(true); |
| } |
| |
| /** |
| * Sends this message to its receiver over its |
| * setOutputStream?? output stream. |
| */ |
| public void send(boolean clearMessage) |
| throws IOException { |
| sendBytes(clearMessage); |
| } |
| |
| /** |
| * Populates the stats of this <code>Message</code> with information |
| * received via its socket |
| */ |
| public void recv() |
| throws IOException { |
| if (this.socket != null) { |
| synchronized(getCommBuffer()) { |
| read(); |
| } |
| } |
| else { |
| throw new IOException(LocalizedStrings.Message_DEAD_CONNECTION.toLocalizedString()); |
| } |
| } |
| public void recv(ServerConnection sc, int MAX_DATA, Semaphore dataLimiter, int MAX_MSGS, Semaphore msgLimiter) |
| throws IOException { |
| this.sc = sc; |
| this.MAX_DATA = MAX_DATA; |
| this.dataLimiter = dataLimiter; |
| // this.MAX_MSGS = MAX_MSGS; |
| this.msgLimiter = msgLimiter; |
| recv(); |
| } |
| |
| public boolean canStartRemoteTransaction() { |
| return true; |
| } |
| } |