| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.cache.tier.sockets; |
| |
| import java.io.EOFException; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.OutputStream; |
| import java.net.Socket; |
| import java.nio.ByteBuffer; |
| import java.nio.channels.SocketChannel; |
| import java.util.Map; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.Semaphore; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.apache.logging.log4j.Logger; |
| |
| import org.apache.geode.SerializationException; |
| import org.apache.geode.annotations.Immutable; |
| import org.apache.geode.annotations.internal.MakeNotStatic; |
| import org.apache.geode.distributed.internal.DistributionConfig; |
| import org.apache.geode.internal.Assert; |
| import org.apache.geode.internal.HeapDataOutputStream; |
| import org.apache.geode.internal.cache.TXManagerImpl; |
| import org.apache.geode.internal.cache.tier.MessageType; |
| import org.apache.geode.internal.offheap.StoredObject; |
| import org.apache.geode.internal.offheap.annotations.Unretained; |
| import org.apache.geode.internal.serialization.Version; |
| import org.apache.geode.internal.util.BlobHelper; |
| import org.apache.geode.logging.internal.log4j.api.LogService; |
| |
| /** |
| * This class encapsulates the wire protocol. It provides accessors to encode and decode a message |
| * and serialize it out to the wire. |
| * |
| * <PRE> |
| * messageType - int - 4 bytes type of message, types enumerated below |
| * |
| * msgLength - int - 4 bytes total length of variable length payload |
| * |
| * numberOfParts - int - 4 bytes number of elements (LEN-BYTE* pairs) |
| * contained in the payload. Message can |
| * be a multi-part message |
| * |
| * transId - int - 4 bytes filled in by the requester, copied back into |
| * the response |
| * |
| * flags - byte- 1 byte filled in by the requester |
| * len1 |
| * part1 |
| * . |
| * . |
| * . |
| * lenn |
| * partn |
| * </PRE> |
| * |
| * We read the fixed length 16 bytes into a byte[] and populate a bytebuffer We read the fixed |
| * length header tokens from the header parse the header and use information contained in there to |
| * read the payload. |
| * |
| * <P> |
| * |
| * See also <a href="package-summary.html#messages">package description</a>. |
| * |
| * @see MessageType |
| */ |
| public class Message { |
| |
| // Tentative workaround to avoid OOM stated in #46754. |
| public static final ThreadLocal<Integer> MESSAGE_TYPE = new ThreadLocal<>(); |
| |
| public static final String MAX_MESSAGE_SIZE_PROPERTY = |
| DistributionConfig.GEMFIRE_PREFIX + "client.max-message-size"; |
| |
| static final int DEFAULT_MAX_MESSAGE_SIZE = 1073741824; |
| |
| private static final Logger logger = LogService.getLogger(); |
| |
| private static final int PART_HEADER_SIZE = 5; // 4 bytes for length, 1 byte for isObject |
| |
| private static final int FIXED_LENGTH = 17; |
| |
| private static final ThreadLocal<ByteBuffer> tlCommBuffer = new ThreadLocal<>(); |
| |
| // These two statics are fields shoved into the flags byte for transmission. |
| // The MESSAGE_IS_RETRY bit is stripped out during deserialization but the other |
| // is left in place |
| private static final byte MESSAGE_HAS_SECURE_PART = (byte) 0x02; |
| private static final byte MESSAGE_IS_RETRY = (byte) 0x04; |
| |
| private static final byte MESSAGE_IS_RETRY_MASK = (byte) 0xFB; |
| |
| private static final int DEFAULT_CHUNK_SIZE = 1024; |
| |
| @Immutable |
| private static final byte[] TRUE = defineTrue(); |
| @Immutable |
| private static final byte[] FALSE = defineFalse(); |
| |
| private static byte[] defineTrue() { |
| try (HeapDataOutputStream hdos = new HeapDataOutputStream(10, null)) { |
| BlobHelper.serializeTo(Boolean.TRUE, hdos); |
| return hdos.toByteArray(); |
| } catch (IOException e) { |
| throw new IllegalStateException(e); |
| } |
| } |
| |
| private static byte[] defineFalse() { |
| try (HeapDataOutputStream hdos = new HeapDataOutputStream(10, null)) { |
| BlobHelper.serializeTo(Boolean.FALSE, hdos); |
| return hdos.toByteArray(); |
| } catch (IOException e) { |
| throw new IllegalStateException(e); |
| } |
| } |
| |
| /** |
| * The maximum size of an outgoing message. If the message is larger than this maximum, it may |
| * cause the receiver to throw an exception on message part length mismatch due to overflow in |
| * message size. |
| * |
| * This value is STATIC because getting a system property requires holding a lock. It is costly to |
| * do this for every message sent. If this value needs to be modified for testing, please add a |
| * new constructor. |
| */ |
| private static final int maxMessageSize = |
| Integer.getInteger(MAX_MESSAGE_SIZE_PROPERTY, DEFAULT_MAX_MESSAGE_SIZE); |
| |
| protected int messageType; |
| private int payloadLength = 0; |
| int numberOfParts = 0; |
| protected int transactionId = TXManagerImpl.NOTX; |
| int currentPart = 0; |
| private Part[] partsList = null; |
| private ByteBuffer cachedCommBuffer; |
| protected Socket socket = null; |
| private SocketChannel socketChannel = null; |
| private OutputStream outputStream = null; |
| protected InputStream inputStream = null; |
| private boolean messageModified = true; |
| |
| /** is this message a retry of a previously sent message? */ |
| private boolean isRetry; |
| |
| private byte flags = 0x00; |
| MessageStats messageStats = null; |
| protected ServerConnection serverConnection = null; |
| private int maxIncomingMessageLength = -1; |
| private Semaphore dataLimiter = null; |
| private Semaphore messageLimiter = null; |
| private boolean readHeader = false; |
| private int chunkSize = DEFAULT_CHUNK_SIZE; |
| |
| Part securePart = null; |
| private boolean isMetaRegion = false; |
| |
| private Version version; |
| |
| /** |
| * Creates a new message with the given number of parts |
| */ |
| public Message(int numberOfParts, Version destVersion) { |
| this.version = destVersion; |
| Assert.assertTrue(destVersion != null, "Attempt to create an unversioned message"); |
| this.partsList = new Part[numberOfParts]; |
| this.numberOfParts = numberOfParts; |
| int partsListLength = this.partsList.length; |
| for (int i = 0; i < partsListLength; i++) { |
| this.partsList[i] = new Part(); |
| } |
| } |
| |
| public boolean isSecureMode() { |
| return this.securePart != null; |
| } |
| |
| public byte[] getSecureBytes() throws IOException, ClassNotFoundException { |
| return (byte[]) this.securePart.getObject(); |
| } |
| |
| public void setMessageType(int msgType) { |
| this.messageModified = true; |
| if (!MessageType.validate(msgType)) { |
| throw new IllegalArgumentException( |
| "Invalid MessageType"); |
| } |
| this.messageType = msgType; |
| } |
| |
| public void setVersion(Version clientVersion) { |
| this.version = clientVersion; |
| } |
| |
| public void setMessageHasSecurePartFlag() { |
| this.flags |= MESSAGE_HAS_SECURE_PART; |
| } |
| |
| public void clearMessageHasSecurePartFlag() { |
| this.flags &= MESSAGE_HAS_SECURE_PART; |
| } |
| |
| /** |
| * Sets and builds the {@link Part}s that are sent in the payload of the Message |
| */ |
| public void setNumberOfParts(int numberOfParts) { |
| // hitesh: need to add security header here from server |
| // need to insure it is not chunked message |
| // should we look message type to avoid internal message like ping |
| this.messageModified = true; |
| this.currentPart = 0; |
| this.numberOfParts = numberOfParts; |
| if (numberOfParts > this.partsList.length) { |
| Part[] newPartsList = new Part[numberOfParts]; |
| for (int i = 0; i < numberOfParts; i++) { |
| if (i < this.partsList.length) { |
| newPartsList[i] = this.partsList[i]; |
| } else { |
| newPartsList[i] = new Part(); |
| } |
| } |
| this.partsList = newPartsList; |
| } |
| } |
| |
| /** |
| * For boundary testing we may need to inject mock parts. For testing only. |
| */ |
| void setParts(Part[] parts) { |
| this.partsList = parts; |
| } |
| |
| public void setTransactionId(int transactionId) { |
| this.messageModified = true; |
| this.transactionId = transactionId; |
| } |
| |
| public void setIsRetry() { |
| this.isRetry = true; |
| } |
| |
| /** |
| * This returns true if the message has been marked as having been previously transmitted to a |
| * different server. |
| */ |
| public boolean isRetry() { |
| return this.isRetry; |
| } |
| |
| /* Sets size for HDOS chunk. */ |
| public void setChunkSize(int chunkSize) { |
| this.chunkSize = chunkSize; |
| } |
| |
| /** |
| * When building a Message this will return the number of the next Part to be added to the message |
| */ |
| int getNextPartNumber() { |
| return this.currentPart; |
| } |
| |
| public void addStringPart(String str) { |
| addStringPart(str, false); |
| } |
| |
| @MakeNotStatic("not tied to the cache lifecycle") |
| private static final Map<String, byte[]> CACHED_STRINGS = new ConcurrentHashMap<>(); |
| |
| public void addStringPart(String str, boolean enableCaching) { |
| if (str == null) { |
| addRawPart(null, false); |
| return; |
| } |
| |
| Part part = this.partsList[this.currentPart]; |
| if (enableCaching) { |
| byte[] bytes = CACHED_STRINGS.get(str); |
| if (bytes == null) { |
| try (HeapDataOutputStream hdos = new HeapDataOutputStream(str)) { |
| bytes = hdos.toByteArray(); |
| CACHED_STRINGS.put(str, bytes); |
| } |
| } |
| part.setPartState(bytes, false); |
| |
| } else { |
| // do NOT close the HeapDataOutputStream |
| this.messageModified = true; |
| part.setPartState(new HeapDataOutputStream(str), false); |
| } |
| this.currentPart++; |
| } |
| |
| /* |
| * Adds a new part to this message that contains a {@code byte} array (as opposed to a serialized |
| * object). |
| * |
| * @see #addPart(byte[], boolean) |
| */ |
| public void addBytesPart(byte[] newPart) { |
| addRawPart(newPart, false); |
| } |
| |
| public void addStringOrObjPart(Object o) { |
| if (o instanceof String || o == null) { |
| addStringPart((String) o); |
| } else { |
| // Note even if o is a byte[] we need to serialize it. |
| // This could be cleaned up but it would require C client code to change. |
| serializeAndAddPart(o, false); |
| } |
| } |
| |
| public void addObjPart(Object o) { |
| addObjPart(o, false); |
| } |
| |
| /** |
| * Like addObjPart(Object) but also prefers to reference objects in the part instead of copying |
| * them into a byte buffer. |
| */ |
| public void addObjPartNoCopying(Object o) { |
| if (o == null || o instanceof byte[]) { |
| addRawPart((byte[]) o, false); |
| } else { |
| serializeAndAddPartNoCopying(o); |
| } |
| } |
| |
| public void addObjPart(Object o, boolean zipValues) { |
| if (o == null || o instanceof byte[]) { |
| addRawPart((byte[]) o, false); |
| } else if (o instanceof Boolean) { |
| addRawPart((Boolean) o ? TRUE : FALSE, true); |
| } else { |
| serializeAndAddPart(o, zipValues); |
| } |
| } |
| |
| /** |
| * Object o is always null |
| */ |
| public void addPartInAnyForm(@Unretained Object o, boolean isObject) { |
| if (o == null) { |
| addRawPart((byte[]) o, false); |
| } else if (o instanceof byte[]) { |
| addRawPart((byte[]) o, isObject); |
| } else if (o instanceof StoredObject) { |
| // It is possible it is an off-heap StoredObject that contains a simple non-object byte[]. |
| this.messageModified = true; |
| Part part = this.partsList[this.currentPart]; |
| part.setPartState((StoredObject) o, isObject); |
| this.currentPart++; |
| } else { |
| serializeAndAddPart(o, false); |
| } |
| } |
| |
| private void serializeAndAddPartNoCopying(Object o) { |
| Version v = this.version; |
| if (this.version.equals(Version.CURRENT)) { |
| v = null; |
| } |
| |
| // Create the HDOS with a flag telling it that it can keep any byte[] or ByteBuffers/ByteSources |
| // passed to it. Do NOT close the HeapDataOutputStream! |
| HeapDataOutputStream hdos = new HeapDataOutputStream(this.chunkSize, v, true); |
| try { |
| BlobHelper.serializeTo(o, hdos); |
| } catch (IOException ex) { |
| throw new SerializationException("failed serializing object", ex); |
| } |
| this.messageModified = true; |
| Part part = this.partsList[this.currentPart]; |
| part.setPartState(hdos, true); |
| this.currentPart++; |
| } |
| |
| private void serializeAndAddPart(Object o, boolean zipValues) { |
| if (zipValues) { |
| throw new UnsupportedOperationException("zipValues no longer supported"); |
| } |
| |
| Version v = this.version; |
| if (this.version.equals(Version.CURRENT)) { |
| v = null; |
| } |
| |
| // do NOT close the HeapDataOutputStream |
| HeapDataOutputStream hdos = new HeapDataOutputStream(this.chunkSize, v); |
| try { |
| BlobHelper.serializeTo(o, hdos); |
| } catch (IOException ex) { |
| throw new SerializationException("failed serializing object", ex); |
| } |
| this.messageModified = true; |
| Part part = this.partsList[this.currentPart]; |
| part.setPartState(hdos, true); |
| this.currentPart++; |
| } |
| |
| public void addIntPart(int v) { |
| this.messageModified = true; |
| Part part = this.partsList[this.currentPart]; |
| part.setInt(v); |
| this.currentPart++; |
| } |
| |
| public void addLongPart(long v) { |
| this.messageModified = true; |
| Part part = this.partsList[this.currentPart]; |
| part.setLong(v); |
| this.currentPart++; |
| } |
| |
| public void addBytePart(byte v) { |
| this.messageModified = true; |
| Part part = this.partsList[this.currentPart]; |
| part.setByte(v); |
| this.currentPart++; |
| } |
| |
| /** |
| * Adds a new part to this message that may contain a serialized object. |
| */ |
| public void addRawPart(byte[] newPart, boolean isObject) { |
| this.messageModified = true; |
| Part part = this.partsList[this.currentPart]; |
| part.setPartState(newPart, isObject); |
| this.currentPart++; |
| } |
| |
| public int getMessageType() { |
| return this.messageType; |
| } |
| |
| public int getPayloadLength() { |
| return this.payloadLength; |
| } |
| |
| public int getHeaderLength() { |
| return FIXED_LENGTH; |
| } |
| |
| public int getNumberOfParts() { |
| return this.numberOfParts; |
| } |
| |
| public int getTransactionId() { |
| return this.transactionId; |
| } |
| |
| public Part getPart(int index) { |
| if (index < this.numberOfParts) { |
| Part p = this.partsList[index]; |
| if (this.version != null) { |
| p.setVersion(this.version); |
| } |
| return p; |
| } |
| return null; |
| } |
| |
| public static ByteBuffer setTLCommBuffer(ByteBuffer bb) { |
| ByteBuffer result = tlCommBuffer.get(); |
| tlCommBuffer.set(bb); |
| return result; |
| } |
| |
| public ByteBuffer getCommBuffer() { |
| if (this.cachedCommBuffer != null) { |
| return this.cachedCommBuffer; |
| } else { |
| return tlCommBuffer.get(); |
| } |
| } |
| |
| public void clear() { |
| this.isRetry = false; |
| int len = this.payloadLength; |
| if (len != 0) { |
| this.payloadLength = 0; |
| } |
| if (this.readHeader) { |
| if (this.messageStats != null) { |
| this.messageStats.decMessagesBeingReceived(len); |
| } |
| } |
| ByteBuffer buffer = getCommBuffer(); |
| if (buffer != null) { |
| buffer.clear(); |
| } |
| clearParts(); |
| if (len != 0 && this.dataLimiter != null) { |
| this.dataLimiter.release(len); |
| this.dataLimiter = null; |
| this.maxIncomingMessageLength = 0; |
| } |
| if (this.readHeader) { |
| if (this.messageLimiter != null) { |
| this.messageLimiter.release(1); |
| this.messageLimiter = null; |
| } |
| this.readHeader = false; |
| } |
| this.flags = 0; |
| } |
| |
| protected void packHeaderInfoForSending(int msgLen, boolean isSecurityHeader) { |
| // setting second bit of flags byte for client this is not require but this makes all changes |
| // easily at client side right now just see this bit and process security header |
| byte flagsByte = this.flags; |
| if (isSecurityHeader) { |
| flagsByte |= MESSAGE_HAS_SECURE_PART; |
| } |
| if (this.isRetry) { |
| flagsByte |= MESSAGE_IS_RETRY; |
| } |
| getCommBuffer().putInt(this.messageType).putInt(msgLen).putInt(this.numberOfParts) |
| .putInt(this.transactionId).put(flagsByte); |
| } |
| |
| protected Part getSecurityPart() { |
| if (this.serverConnection != null) { |
| // look types right put get etc |
| return this.serverConnection.updateAndGetSecurityPart(); |
| } |
| return null; |
| } |
| |
| public void setSecurePart(byte[] bytes) { |
| this.securePart = new Part(); |
| this.securePart.setPartState(bytes, false); |
| } |
| |
| public void setMetaRegion(boolean isMetaRegion) { |
| this.isMetaRegion = isMetaRegion; |
| } |
| |
| boolean getAndResetIsMetaRegion() { |
| boolean isMetaRegion = this.isMetaRegion; |
| this.isMetaRegion = false; |
| return isMetaRegion; |
| } |
| |
| /** |
| * Sends this message out on its socket. |
| */ |
| void sendBytes(boolean clearMessage) throws IOException { |
| if (this.serverConnection != null) { |
| // Keep track of the fact that we are making progress. |
| this.serverConnection.updateProcessingMessage(); |
| } |
| if (this.socket == null) { |
| throw new IOException("Dead Connection"); |
| } |
| try { |
| final ByteBuffer commBuffer = getCommBuffer(); |
| if (commBuffer == null) { |
| throw new IOException("No buffer"); |
| } |
| synchronized (commBuffer) { |
| long totalPartLen = 0; |
| long headerLen = 0; |
| int partsToTransmit = this.numberOfParts; |
| |
| for (int i = 0; i < this.numberOfParts; i++) { |
| Part part = this.partsList[i]; |
| headerLen += PART_HEADER_SIZE; |
| totalPartLen += part.getLength(); |
| } |
| |
| Part securityPart = this.getSecurityPart(); |
| if (securityPart == null) { |
| securityPart = this.securePart; |
| } |
| if (securityPart != null) { |
| headerLen += PART_HEADER_SIZE; |
| totalPartLen += securityPart.getLength(); |
| partsToTransmit++; |
| } |
| |
| if (headerLen + totalPartLen > Integer.MAX_VALUE) { |
| throw new MessageTooLargeException( |
| "Message size (" + (headerLen + totalPartLen) + ") exceeds maximum integer value"); |
| } |
| |
| int msgLen = (int) (headerLen + totalPartLen); |
| |
| if (msgLen > this.maxMessageSize) { |
| throw new MessageTooLargeException("Message size (" + msgLen |
| + ") exceeds gemfire.client.max-message-size setting (" + this.maxMessageSize + ")"); |
| } |
| |
| commBuffer.clear(); |
| packHeaderInfoForSending(msgLen, securityPart != null); |
| for (int i = 0; i < partsToTransmit; i++) { |
| Part part = i == this.numberOfParts ? securityPart : this.partsList[i]; |
| |
| if (commBuffer.remaining() < PART_HEADER_SIZE) { |
| flushBuffer(); |
| } |
| |
| int partLen = part.getLength(); |
| commBuffer.putInt(partLen); |
| commBuffer.put(part.getTypeCode()); |
| if (partLen <= commBuffer.remaining()) { |
| part.writeTo(commBuffer); |
| } else { |
| flushBuffer(); |
| if (this.socketChannel != null) { |
| part.writeTo(this.socketChannel, commBuffer); |
| } else { |
| part.writeTo(this.outputStream, commBuffer); |
| } |
| if (this.messageStats != null) { |
| this.messageStats.incSentBytes(partLen); |
| } |
| } |
| } |
| if (commBuffer.position() != 0) { |
| flushBuffer(); |
| } |
| this.messageModified = false; |
| if (this.socketChannel == null) { |
| this.outputStream.flush(); |
| } |
| } |
| } finally { |
| if (clearMessage) { |
| clearParts(); |
| } |
| } |
| } |
| |
| void flushBuffer() throws IOException { |
| final ByteBuffer cb = getCommBuffer(); |
| if (this.socketChannel != null) { |
| cb.flip(); |
| do { |
| this.socketChannel.write(cb); |
| } while (cb.remaining() > 0); |
| } else { |
| this.outputStream.write(cb.array(), 0, cb.position()); |
| } |
| if (this.messageStats != null) { |
| this.messageStats.incSentBytes(cb.position()); |
| } |
| cb.clear(); |
| } |
| |
| private void readHeaderAndBody(boolean setHeaderReadTimeout, int headerReadTimeoutMillis) |
| throws IOException { |
| clearParts(); |
| // TODO: for server changes make sure sc is not null as this class also used by client |
| |
| int oldTimeout = -1; |
| if (setHeaderReadTimeout) { |
| oldTimeout = socket.getSoTimeout(); |
| socket.setSoTimeout(headerReadTimeoutMillis); |
| } |
| try { |
| fetchHeader(); |
| } finally { |
| if (setHeaderReadTimeout) { |
| socket.setSoTimeout(oldTimeout); |
| } |
| } |
| |
| final ByteBuffer cb = getCommBuffer(); |
| final int type = cb.getInt(); |
| final int len = cb.getInt(); |
| final int numParts = cb.getInt(); |
| final int txid = cb.getInt(); |
| byte bits = cb.get(); |
| cb.clear(); |
| |
| if (!MessageType.validate(type)) { |
| throw new IOException(String.format("Invalid message type %s while reading header", |
| type)); |
| } |
| |
| int timeToWait = 0; |
| if (this.serverConnection != null) { |
| // Keep track of the fact that a message is being processed. |
| this.serverConnection.setProcessingMessage(); |
| timeToWait = this.serverConnection.getClientReadTimeout(); |
| } |
| this.readHeader = true; |
| |
| if (this.messageLimiter != null) { |
| for (;;) { |
| this.serverConnection.getCachedRegionHelper().checkCancelInProgress(null); |
| boolean interrupted = Thread.interrupted(); |
| try { |
| if (timeToWait == 0) { |
| this.messageLimiter.acquire(1); |
| } else { |
| if (!this.messageLimiter.tryAcquire(1, timeToWait, TimeUnit.MILLISECONDS)) { |
| if (this.messageStats instanceof CacheServerStats) { |
| ((CacheServerStats) this.messageStats).incConnectionsTimedOut(); |
| } |
| throw new IOException( |
| String.format( |
| "Operation timed out on server waiting on concurrent message limiter after waiting %s milliseconds", |
| timeToWait)); |
| } |
| } |
| break; |
| } catch (InterruptedException ignore) { |
| interrupted = true; |
| } finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } // for |
| } |
| |
| if (len > 0) { |
| if (this.maxIncomingMessageLength > 0 && len > this.maxIncomingMessageLength) { |
| throw new IOException(String.format("Message size %s exceeded max limit of %s", |
| new Object[] {len, this.maxIncomingMessageLength})); |
| } |
| |
| if (this.dataLimiter != null) { |
| for (;;) { |
| if (this.serverConnection != null) { |
| this.serverConnection.getCachedRegionHelper().checkCancelInProgress(null); |
| } |
| boolean interrupted = Thread.interrupted(); |
| try { |
| if (timeToWait == 0) { |
| this.dataLimiter.acquire(len); |
| } else { |
| int newTimeToWait = timeToWait; |
| if (this.messageLimiter != null) { |
| // may have waited for msg limit so recalc time to wait |
| newTimeToWait -= (int) this.serverConnection.getCurrentMessageProcessingTime(); |
| } |
| if (newTimeToWait <= 0 |
| || !this.messageLimiter.tryAcquire(1, newTimeToWait, TimeUnit.MILLISECONDS)) { |
| throw new IOException( |
| String.format( |
| "Operation timed out on server waiting on concurrent data limiter after waiting %s milliseconds", |
| timeToWait)); |
| } |
| } |
| // makes sure payloadLength gets set now so we will release the semaphore |
| this.payloadLength = len; |
| break; // success |
| } catch (InterruptedException ignore) { |
| interrupted = true; |
| } finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| } |
| } |
| if (this.messageStats != null) { |
| this.messageStats.incMessagesBeingReceived(len); |
| this.payloadLength = len; // makes sure payloadLength gets set now so we will dec on clear |
| } |
| |
| this.isRetry = (bits & MESSAGE_IS_RETRY) != 0; |
| bits &= MESSAGE_IS_RETRY_MASK; |
| this.flags = bits; |
| this.messageType = type; |
| |
| readPayloadFields(numParts, len); |
| |
| // Set the header and payload fields only after receiving all the |
| // socket data, providing better message consistency in the face |
| // of exceptional conditions (e.g. IO problems, timeouts etc.) |
| this.payloadLength = len; |
| // this.numberOfParts = numParts; Already set in setPayloadFields via setNumberOfParts |
| this.transactionId = txid; |
| this.flags = bits; |
| if (this.serverConnection != null) { |
| // Keep track of the fact that a message is being processed. |
| this.serverConnection.updateProcessingMessage(); |
| } |
| } |
| |
| /** |
| * Read the actual bytes of the header off the socket |
| */ |
| void fetchHeader() throws IOException { |
| final ByteBuffer cb = getCommBuffer(); |
| cb.clear(); |
| |
| // messageType is invalidated here and can be used as an indicator |
| // of problems reading the message |
| this.messageType = MessageType.INVALID; |
| |
| final int headerLength = getHeaderLength(); |
| if (this.socketChannel != null) { |
| cb.limit(headerLength); |
| do { |
| int bytesRead = this.socketChannel.read(cb); |
| if (bytesRead == -1) { |
| throw new EOFException( |
| "The connection has been reset while reading the header"); |
| } |
| if (this.messageStats != null) { |
| this.messageStats.incReceivedBytes(bytesRead); |
| } |
| } while (cb.remaining() > 0); |
| cb.flip(); |
| |
| } else { |
| int hdr = 0; |
| do { |
| int bytesRead = this.inputStream.read(cb.array(), hdr, headerLength - hdr); |
| if (bytesRead == -1) { |
| throw new EOFException( |
| "The connection has been reset while reading the header"); |
| } |
| hdr += bytesRead; |
| if (this.messageStats != null) { |
| this.messageStats.incReceivedBytes(bytesRead); |
| } |
| } while (hdr < headerLength); |
| |
| // now setup the commBuffer for the caller to parse it |
| cb.rewind(); |
| } |
| } |
| |
| /** |
| * TODO: refactor overly long method readPayloadFields |
| */ |
| void readPayloadFields(final int numParts, final int len) throws IOException { |
| if (len > 0 && numParts <= 0 || len <= 0 && numParts > 0) { |
| throw new IOException( |
| String.format("Part length ( %s ) and number of parts ( %s ) inconsistent", |
| new Object[] {len, numParts})); |
| } |
| |
| Integer msgType = MESSAGE_TYPE.get(); |
| if (msgType != null && msgType == MessageType.PING) { |
| // set it to null right away. |
| MESSAGE_TYPE.set(null); |
| // Some number which will not throw OOM but still be acceptable for a ping operation. |
| int pingParts = 10; |
| if (numParts > pingParts) { |
| throw new IOException("Part length ( " + numParts + " ) is inconsistent for " |
| + MessageType.getString(msgType) + " operation."); |
| } |
| } |
| |
| setNumberOfParts(numParts); |
| if (numParts <= 0) { |
| return; |
| } |
| |
| if (len < 0) { |
| logger.info("rpl: neg len: {}", len); |
| throw new IOException("Dead Connection"); |
| } |
| |
| final ByteBuffer cb = getCommBuffer(); |
| cb.clear(); |
| cb.flip(); |
| |
| int readSecurePart = checkAndSetSecurityPart(); |
| |
| int bytesRemaining = len; |
| for (int i = 0; i < numParts + readSecurePart |
| || readSecurePart == 1 && cb.remaining() > 0; i++) { |
| int bytesReadThisTime = readPartChunk(bytesRemaining); |
| bytesRemaining -= bytesReadThisTime; |
| |
| Part part; |
| |
| if (i < numParts) { |
| part = this.partsList[i]; |
| } else { |
| part = this.securePart; |
| } |
| |
| int partLen = cb.getInt(); |
| byte partType = cb.get(); |
| byte[] partBytes = null; |
| |
| if (partLen > 0) { |
| partBytes = new byte[partLen]; |
| int alreadyReadBytes = cb.remaining(); |
| if (alreadyReadBytes > 0) { |
| if (partLen < alreadyReadBytes) { |
| alreadyReadBytes = partLen; |
| } |
| cb.get(partBytes, 0, alreadyReadBytes); |
| } |
| |
| // now we need to read partLen - alreadyReadBytes off the wire |
| int off = alreadyReadBytes; |
| int remaining = partLen - off; |
| while (remaining > 0) { |
| if (this.socketChannel != null) { |
| int bytesThisTime = remaining; |
| cb.clear(); |
| if (bytesThisTime > cb.capacity()) { |
| bytesThisTime = cb.capacity(); |
| } |
| cb.limit(bytesThisTime); |
| int res = this.socketChannel.read(cb); |
| if (res != -1) { |
| cb.flip(); |
| bytesRemaining -= res; |
| remaining -= res; |
| cb.get(partBytes, off, res); |
| off += res; |
| if (this.messageStats != null) { |
| this.messageStats.incReceivedBytes(res); |
| } |
| } else { |
| throw new EOFException( |
| "The connection has been reset while reading a part"); |
| } |
| } else { |
| int res = this.inputStream.read(partBytes, off, remaining); |
| if (res != -1) { |
| bytesRemaining -= res; |
| remaining -= res; |
| off += res; |
| if (this.messageStats != null) { |
| this.messageStats.incReceivedBytes(res); |
| } |
| } else { |
| throw new EOFException( |
| "The connection has been reset while reading a part"); |
| } |
| } |
| } |
| } |
| part.init(partBytes, partType); |
| } |
| } |
| |
| protected int checkAndSetSecurityPart() { |
| if ((this.flags | MESSAGE_HAS_SECURE_PART) == this.flags) { |
| this.securePart = new Part(); |
| return 1; |
| } else { |
| this.securePart = null; |
| return 0; |
| } |
| } |
| |
| /** |
| * @param bytesRemaining the most bytes we can read |
| * @return the number of bytes read into commBuffer |
| */ |
| private int readPartChunk(int bytesRemaining) throws IOException { |
| final ByteBuffer commBuffer = getCommBuffer(); |
| if (commBuffer.remaining() >= PART_HEADER_SIZE) { |
| // we already have the next part header in commBuffer so just return |
| return 0; |
| } |
| |
| if (commBuffer.position() != 0) { |
| commBuffer.compact(); |
| } else { |
| commBuffer.position(commBuffer.limit()); |
| commBuffer.limit(commBuffer.capacity()); |
| } |
| |
| if (this.serverConnection != null) { |
| // Keep track of the fact that we are making progress |
| this.serverConnection.updateProcessingMessage(); |
| } |
| int bytesRead = 0; |
| |
| if (this.socketChannel != null) { |
| int remaining = commBuffer.remaining(); |
| if (remaining > bytesRemaining) { |
| remaining = bytesRemaining; |
| commBuffer.limit(commBuffer.position() + bytesRemaining); |
| } |
| while (remaining > 0) { |
| int res = this.socketChannel.read(commBuffer); |
| if (res != -1) { |
| remaining -= res; |
| bytesRead += res; |
| if (this.messageStats != null) { |
| this.messageStats.incReceivedBytes(res); |
| } |
| } else { |
| throw new EOFException( |
| "The connection has been reset while reading the payload"); |
| } |
| } |
| |
| } else { |
| int bytesToRead = commBuffer.capacity() - commBuffer.position(); |
| if (bytesRemaining < bytesToRead) { |
| bytesToRead = bytesRemaining; |
| } |
| int pos = commBuffer.position(); |
| |
| while (bytesToRead > 0) { |
| int res = this.inputStream.read(commBuffer.array(), pos, bytesToRead); |
| if (res != -1) { |
| bytesToRead -= res; |
| pos += res; |
| bytesRead += res; |
| if (this.messageStats != null) { |
| this.messageStats.incReceivedBytes(res); |
| } |
| } else { |
| throw new EOFException( |
| "The connection has been reset while reading the payload"); |
| } |
| } |
| |
| commBuffer.position(pos); |
| } |
| commBuffer.flip(); |
| return bytesRead; |
| } |
| |
| /** |
| * Gets rid of all the parts that have been added to this message. |
| */ |
| public void clearParts() { |
| for (Part part : this.partsList) { |
| part.clear(); |
| } |
| this.currentPart = 0; |
| } |
| |
| @Override |
| public String toString() { |
| StringBuilder sb = new StringBuilder(); |
| sb.append("type=").append(MessageType.getString(this.messageType)); |
| sb.append("; payloadLength=").append(this.payloadLength); |
| sb.append("; numberOfParts=").append(this.numberOfParts); |
| sb.append("; hasSecurePart=").append(isSecureMode()); |
| sb.append("; transactionId=").append(this.transactionId); |
| sb.append("; currentPart=").append(this.currentPart); |
| sb.append("; messageModified=").append(this.messageModified); |
| sb.append("; flags=").append(Integer.toHexString(this.flags)); |
| for (int i = 0; i < this.numberOfParts; i++) { |
| sb.append("; part[").append(i).append("]={"); |
| sb.append(this.partsList[i]); |
| sb.append("}"); |
| } |
| return sb.toString(); |
| } |
| |
| // Set up a message on the server side. |
| void setComms(ServerConnection sc, Socket socket, ByteBuffer bb, MessageStats msgStats) |
| throws IOException { |
| this.serverConnection = sc; |
| setComms(socket, bb, msgStats); |
| } |
| |
| // Set up a message on the client side. |
| void setComms(Socket socket, ByteBuffer bb, MessageStats msgStats) throws IOException { |
| this.socketChannel = socket.getChannel(); |
| if (this.socketChannel == null) { |
| setComms(socket, socket.getInputStream(), socket.getOutputStream(), bb, msgStats); |
| } else { |
| setComms(socket, null, null, bb, msgStats); |
| } |
| } |
| |
| // Set up a message on the client side. |
| public void setComms(Socket socket, InputStream is, OutputStream os, ByteBuffer bb, |
| MessageStats msgStats) { |
| Assert.assertTrue(socket != null); |
| this.socket = socket; |
| this.socketChannel = socket.getChannel(); |
| this.inputStream = is; |
| this.outputStream = os; |
| this.cachedCommBuffer = bb; |
| this.messageStats = msgStats; |
| } |
| |
| /** |
| * Undo any state changes done by setComms. |
| * |
| * @since GemFire 5.7 |
| */ |
| public void unsetComms() { |
| this.socket = null; |
| this.socketChannel = null; |
| this.inputStream = null; |
| this.outputStream = null; |
| this.cachedCommBuffer = null; |
| this.messageStats = null; |
| } |
| |
| /** |
| * Sends this message to its receiver over its setOutputStream?? output stream. |
| */ |
| public void send() throws IOException { |
| send(true); |
| } |
| |
| public void send(ServerConnection servConn) throws IOException { |
| if (this.serverConnection != servConn) |
| throw new IllegalStateException("this.sc was not correctly set"); |
| send(true); |
| } |
| |
| /** |
| * Sends this message to its receiver over its setOutputStream?? output stream. |
| */ |
| public void send(boolean clearMessage) throws IOException { |
| sendBytes(clearMessage); |
| } |
| |
| /** |
| * Read a message, populating the state of this {@code Message} with information received via its |
| * socket |
| * |
| * @param timeoutMillis timeout setting for reading the header (0 = no timeout) |
| */ |
| public void receiveWithHeaderReadTimeout(int timeoutMillis) throws IOException { |
| if (this.socket != null) { |
| synchronized (getCommBuffer()) { |
| readHeaderAndBody(true, timeoutMillis); |
| } |
| } else { |
| throw new IOException("Dead Connection"); |
| } |
| } |
| |
| /** |
| * Populates the state of this {@code Message} with information received via its socket |
| */ |
| public void receive() throws IOException { |
| if (this.socket != null) { |
| synchronized (getCommBuffer()) { |
| readHeaderAndBody(false, -1); |
| } |
| } else { |
| throw new IOException("Dead Connection"); |
| } |
| } |
| |
| public void receive(ServerConnection sc, int maxMessageLength, Semaphore dataLimiter, |
| Semaphore msgLimiter) throws IOException { |
| this.serverConnection = sc; |
| this.maxIncomingMessageLength = maxMessageLength; |
| this.dataLimiter = dataLimiter; |
| this.messageLimiter = msgLimiter; |
| receive(); |
| } |
| |
| } |