| /*========================================================================= |
| * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved. |
| * This product is protected by U.S. and international copyright |
| * and intellectual property laws. Pivotal products are covered by |
| * more patents listed at http://www.pivotal.io/patents. |
| *========================================================================= |
| */ |
| |
| package com.gemstone.gemfire.internal.cache.tier.sockets; |
| |
| import com.gemstone.gemfire.internal.Version; |
| import com.gemstone.gemfire.internal.cache.tier.MessageType; |
| import com.gemstone.gemfire.internal.i18n.LocalizedStrings; |
| import com.gemstone.gemfire.internal.logging.LogService; |
| |
| import java.io.EOFException; |
| import java.io.IOException; |
| import java.net.SocketTimeoutException; |
| import java.nio.ByteBuffer; |
| |
| import org.apache.logging.log4j.Logger; |
| |
| /** |
| * Class <code>ChunkedMessage</code> is used to send messages from a server to |
| * a client divided into chunks. |
| * |
| * This class encapsulates the wire protocol. It provides accessors to encode |
| * and decode a message and serialize it out to the wire. |
| * |
| * <PRE> |
| * |
| * msgType - int - 4 bytes type of message, types enumerated below |
| * |
| * numberOfParts - int - 4 bytes number of elements (LEN-BYTE* pairs) contained |
| * in the payload. Message can be a multi-part message |
| * |
| * transId - int - 4 bytes filled in by the requestor, copied back into the |
| * response len1 part1 . . . lenn partn |
| * |
| * </PRE> |
| * |
| * We read the fixed length 15 bytes into a byte[] and populate a bytebuffer We |
| * read the fixed length header tokens from the header parse the header and use |
| * information contained in there to read the payload. |
| * |
| * <P> |
| * |
| * See also <a href="package-summary.html#messages">package description </a>. |
| * |
| * @see com.gemstone.gemfire.internal.cache.tier.MessageType |
| * |
| * @author Barry Oglesby |
| * |
| * @since 4.2 |
| */ |
| public class ChunkedMessage extends Message |
| { |
| private static final Logger logger = LogService.getLogger(); |
| |
| /** |
| * The chunk header length. |
| * The chunk header contains a 5-byte int chunk length (4 bytes for the chunk |
| * length and 1 byte for the last chunk boolean) |
| */ |
| private static final int CHUNK_HEADER_LENGTH = 5; |
| /** |
| * The main header length. The main header contains 3 4-byte ints |
| */ |
| private static final int CHUNK_MSG_HEADER_LENGTH = 12; |
| |
| |
| /** |
| * The chunk's payload length |
| */ |
| protected int chunkLength; |
| |
| /** |
| * Whether this is the last chunk |
| */ |
| protected byte lastChunk; |
| |
| // /** |
| // * The main header length. The main header contains 3 4-byte ints |
| // */ |
| // private static final int HEADER_LENGTH = 12; |
| |
| /** |
| * Initially false; set to true once the message header is sent; set back to |
| * false when last chunk is sent. |
| */ |
| private transient boolean headerSent = false; |
| |
| @Override |
| public String toString() { |
| StringBuffer sb = new StringBuffer(); |
| |
| sb.append(super.toString()); |
| sb.append("; chunkLength= " + chunkLength); |
| sb.append("; lastChunk=" + lastChunk); |
| return sb.toString(); |
| } |
| |
| /** |
| * Creates a new message with the given number of parts |
| * @param numberOfParts The number of parts to create |
| */ |
| public ChunkedMessage(int numberOfParts, Version version) { |
| super(numberOfParts, version); |
| } |
| |
| /** |
| * Returns the header length. |
| * @return the header length |
| */ |
| @Override |
| public int getHeaderLength() { |
| return CHUNK_MSG_HEADER_LENGTH; |
| } |
| |
| /** |
| * Sets whether this is the last chunk. |
| * @param lastChunk Whether this is the last chunk |
| */ |
| public void setLastChunk(boolean lastChunk) { |
| //TODO:hitesh now it should send security header(connectionID) |
| if(lastChunk){ |
| this.lastChunk=0X01; |
| setFESpecialCase(); |
| } |
| else { |
| this.lastChunk= 0X00; |
| } |
| } |
| |
| private void setFESpecialCase(){ |
| byte b = ServerConnection.isExecuteFunctionOnLocalNodeOnly(); |
| if((b & 1) == 1) { |
| //we are in special function execution case, where filter key is one only |
| //now checking whether this function executed locally or not. |
| //if not then inform client so that it will refresh pr-meta-data |
| if(((b & 2) == 2)) { |
| |
| this.lastChunk |= 0x04;//setting third bit, we are okay |
| } |
| } |
| } |
| public void setLastChunkAndNumParts(boolean lastChunk, int numParts) { |
| setLastChunk(lastChunk); |
| if (this.sc != null |
| && this.sc.getClientVersion().compareTo(Version.GFE_65) >= 0) { |
| //we us e three bits for number of parts in last chunk byte |
| //we us e three bits for number of parts in last chunk byte |
| byte localLastChunk = (byte)(numParts << 5); |
| this.lastChunk |= localLastChunk; |
| } |
| } |
| |
| public void setServerConnection(ServerConnection servConn) |
| { |
| if (this.sc != servConn) throw new IllegalStateException("this.sc was not correctly set"); |
| } |
| |
| /** |
| * Answers whether this is the last chunk. |
| * @return whether this is the last chunk |
| */ |
| public boolean isLastChunk() { |
| if((this.lastChunk & 0X01) == 0X01) { |
| return true; |
| } |
| |
| return false; |
| } |
| |
| /** |
| * Returns the chunk length. |
| * |
| * @return the chunk length |
| */ |
| public int getChunkLength() |
| { |
| return this.chunkLength; |
| } |
| |
| /** |
| * Populates the header with information received via socket |
| */ |
| public void readHeader() throws IOException { |
| if (this.socket != null) { |
| final ByteBuffer cb = getCommBuffer(); |
| synchronized(cb) { |
| fetchHeader(); |
| final int type = cb.getInt(); |
| final int numParts = cb.getInt(); |
| final int txid = cb.getInt(); |
| cb.clear(); |
| if (!MessageType.validate(type)) { |
| throw new IOException(LocalizedStrings.ChunkedMessage_INVALID_MESSAGE_TYPE_0_WHILE_READING_HEADER.toLocalizedString(Integer.valueOf(type))); |
| } |
| |
| // Set the header and payload fields only after receiving all the |
| // socket data, providing better message consistency in the face |
| // of exceptional conditions (e.g. IO problems, timeouts etc.) |
| this.msgType = type; |
| this.numberOfParts = numParts; // Already set in setPayloadFields via setNumberOfParts |
| this.transactionId = txid; |
| } |
| } |
| else { |
| throw new IOException(LocalizedStrings.ChunkedMessage_DEAD_CONNECTION.toLocalizedString()); |
| } |
| } |
| |
| /** |
| * Reads a chunk of this message. |
| */ |
| public void receiveChunk() throws IOException { |
| if (this.socket != null) { |
| synchronized(getCommBuffer()) { |
| readChunk(); |
| } |
| } |
| else { |
| throw new IOException(LocalizedStrings.ChunkedMessage_DEAD_CONNECTION.toLocalizedString()); |
| } |
| } |
| |
| /** |
| * Reads a chunk of this message. |
| */ |
| private void readChunk() throws IOException { |
| final ByteBuffer cb = getCommBuffer(); |
| clearParts(); |
| cb.clear(); |
| int totalBytesRead = 0; |
| do { |
| //@TODO DARREL: add channel support |
| int bytesRead = 0; |
| try { |
| bytesRead = is.read(cb.array(), totalBytesRead, CHUNK_HEADER_LENGTH-totalBytesRead); |
| } |
| catch (SocketTimeoutException e) { |
| // bytesRead = 0; |
| // TODO add a cancellation check |
| throw e; |
| } |
| if (bytesRead == -1) { |
| throw new EOFException(LocalizedStrings.ChunkedMessage_CHUNK_READ_ERROR_CONNECTION_RESET.toLocalizedString()); |
| } |
| totalBytesRead += bytesRead; |
| if (this.msgStats != null) { |
| this.msgStats.incReceivedBytes(bytesRead); |
| } |
| } while (totalBytesRead < CHUNK_HEADER_LENGTH); |
| |
| cb.rewind(); |
| |
| // Set chunk length and last chunk |
| this.chunkLength = cb.getInt(); |
| //setLastChunk(cb.get() == 0x01); |
| byte lastChunk = cb.get(); |
| setLastChunk((lastChunk & 0x01) == 0x01); |
| if ((lastChunk & 0x02) == 0x02) { |
| this.securePart = new Part(); |
| if (logger.isDebugEnabled()) { |
| logger.debug("ChunkedMessage.readChunk() securePart present"); |
| } |
| } |
| cb.clear(); |
| if ((lastChunk & 0x01) == 0x01) { |
| int numParts = lastChunk >> 5; |
| if (numParts > 0) { |
| this.numberOfParts = numParts; |
| } |
| } |
| readPayloadFields(this.numberOfParts, this.chunkLength); |
| } |
| |
| /** |
| * Sends the header of this message. |
| */ |
| public void sendHeader() throws IOException { |
| if (this.socket != null) { |
| synchronized(getCommBuffer()) { |
| getHeaderBytesForWrite(); |
| flushBuffer(); |
| // Darrel says: I see no need for the following os.flush() call |
| // so I've deadcoded it for performance. |
| // this.os.flush(); |
| } |
| this.currentPart=0; |
| this.headerSent = true; |
| } |
| else { |
| throw new IOException(LocalizedStrings.ChunkedMessage_DEAD_CONNECTION.toLocalizedString()); |
| } |
| } |
| |
| /** |
| * Return true if the header for this message has already been sent. |
| */ |
| public boolean headerHasBeenSent() |
| { |
| return this.headerSent; |
| } |
| |
| /** |
| * Sends a chunk of this message. |
| */ |
| public void sendChunk() throws IOException { |
| if (isLastChunk()) { |
| this.headerSent = false; |
| } |
| sendBytes(true); |
| } |
| |
| /** |
| * Sends a chunk of this message. |
| */ |
| public void sendChunk(ServerConnection servConn) throws IOException { |
| if (this.sc != servConn) throw new IllegalStateException("this.sc was not correctly set"); |
| sendChunk(); |
| } |
| |
| @Override |
| protected Part getSecurityPart() { |
| // TODO Auto-generated method stub |
| if (this.isLastChunk()) |
| return super.getSecurityPart(); |
| else |
| return null; |
| } |
| |
| @Override |
| protected int checkAndSetSecurityPart() { |
| return (this.securePart != null) ? 1 : 0; |
| } |
| |
| @Override |
| protected void packHeaderInfoForSending(int msgLen, boolean isSecurityHeader) { |
| final ByteBuffer cb = getCommBuffer(); |
| cb.putInt(msgLen); |
| byte isLastChunk = 0x00; |
| if (isLastChunk()) { |
| //isLastChunk = (byte) 0x01 ; |
| isLastChunk = this.lastChunk ; |
| if (isSecurityHeader) { |
| isLastChunk |= 0x02; |
| } |
| } |
| //cb.put(isLastChunk() ? (byte) 0x01 : (byte) 0x00); |
| cb.put(isLastChunk); |
| } |
| |
| /** |
| * Converts the header of this message into a <code>byte</code> array using a |
| * {@link ByteBuffer}. |
| */ |
| protected void getHeaderBytesForWrite() { |
| final ByteBuffer cb = getCommBuffer(); |
| cb.clear(); |
| cb.putInt(this.msgType); |
| cb.putInt(this.numberOfParts); |
| |
| cb.putInt(this.transactionId); |
| } |
| } |