| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.activemq.artemis.core.client.impl; |
| |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| import org.apache.activemq.artemis.api.core.ActiveMQBuffer; |
| import org.apache.activemq.artemis.api.core.ActiveMQBuffers; |
| import org.apache.activemq.artemis.api.core.ActiveMQException; |
| import org.apache.activemq.artemis.api.core.Message; |
| import org.apache.activemq.artemis.api.core.SimpleString; |
| import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler; |
| import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle; |
| import org.apache.activemq.artemis.core.message.BodyEncoder; |
| import org.apache.activemq.artemis.core.message.impl.MessageInternal; |
| import org.apache.activemq.artemis.spi.core.remoting.SessionContext; |
| import org.apache.activemq.artemis.utils.DeflaterReader; |
| import org.apache.activemq.artemis.utils.ActiveMQBufferInputStream; |
| import org.apache.activemq.artemis.utils.TokenBucketLimiter; |
| import org.apache.activemq.artemis.utils.UUIDGenerator; |
| import org.jboss.logging.Logger; |
| |
| /** |
| * The client-side Producer. |
| */ |
| public class ClientProducerImpl implements ClientProducerInternal { |
| |
| private static final Logger logger = Logger.getLogger(ClientProducerImpl.class); |
| |
| private final SimpleString address; |
| |
| private final ClientSessionInternal session; |
| |
| private final SessionContext sessionContext; |
| |
| private volatile boolean closed; |
| |
| // For rate throttling |
| |
| private final TokenBucketLimiter rateLimiter; |
| |
| private final boolean blockOnNonDurableSend; |
| |
| private final boolean blockOnDurableSend; |
| |
| private final SimpleString groupID; |
| |
| private final int minLargeMessageSize; |
| |
| private final ClientProducerCredits producerCredits; |
| |
| // Static --------------------------------------------------------------------------------------- |
| |
| // Constructors --------------------------------------------------------------------------------- |
| |
| public ClientProducerImpl(final ClientSessionInternal session, |
| final SimpleString address, |
| final TokenBucketLimiter rateLimiter, |
| final boolean blockOnNonDurableSend, |
| final boolean blockOnDurableSend, |
| final boolean autoGroup, |
| final SimpleString groupID, |
| final int minLargeMessageSize, |
| final SessionContext sessionContext) { |
| this.sessionContext = sessionContext; |
| |
| this.session = session; |
| |
| this.address = address; |
| |
| this.rateLimiter = rateLimiter; |
| |
| this.blockOnNonDurableSend = blockOnNonDurableSend; |
| |
| this.blockOnDurableSend = blockOnDurableSend; |
| |
| if (autoGroup) { |
| this.groupID = UUIDGenerator.getInstance().generateSimpleStringUUID(); |
| } |
| else { |
| this.groupID = groupID; |
| } |
| |
| this.minLargeMessageSize = minLargeMessageSize; |
| |
| if (address != null) { |
| producerCredits = session.getCredits(address, false); |
| } |
| else { |
| producerCredits = null; |
| } |
| } |
| |
| // ClientProducer implementation ---------------------------------------------------------------- |
| |
| @Override |
| public SimpleString getAddress() { |
| return address; |
| } |
| |
| @Override |
| public void send(final Message msg) throws ActiveMQException { |
| checkClosed(); |
| |
| doSend(null, msg, null, false); |
| } |
| |
| @Override |
| public void send(final SimpleString address1, final Message msg) throws ActiveMQException { |
| checkClosed(); |
| |
| doSend(address1, msg, null, false); |
| } |
| |
| @Override |
| public void send(final String address1, final Message message) throws ActiveMQException { |
| send(SimpleString.toSimpleString(address1), message); |
| } |
| |
| @Override |
| public void send(SimpleString address1, |
| Message message, |
| SendAcknowledgementHandler handler) throws ActiveMQException { |
| checkClosed(); |
| boolean confirmationWindowEnabled = session.isConfirmationWindowEnabled(); |
| if (confirmationWindowEnabled) { |
| doSend(address1, message, handler, true); |
| } |
| else { |
| doSend(address1, message, null, true); |
| if (handler != null) { |
| session.scheduleConfirmation(handler, message); |
| } |
| } |
| } |
| |
| @Override |
| public void send(Message message, SendAcknowledgementHandler handler) throws ActiveMQException { |
| send(null, message, handler); |
| } |
| |
| @Override |
| public synchronized void close() throws ActiveMQException { |
| if (closed) { |
| return; |
| } |
| |
| doCleanup(); |
| } |
| |
| @Override |
| public void cleanUp() { |
| if (closed) { |
| return; |
| } |
| |
| doCleanup(); |
| } |
| |
| @Override |
| public boolean isClosed() { |
| return closed; |
| } |
| |
| @Override |
| public boolean isBlockOnDurableSend() { |
| return blockOnDurableSend; |
| } |
| |
| @Override |
| public boolean isBlockOnNonDurableSend() { |
| return blockOnNonDurableSend; |
| } |
| |
| @Override |
| public int getMaxRate() { |
| return rateLimiter == null ? -1 : rateLimiter.getRate(); |
| } |
| |
| // Public --------------------------------------------------------------------------------------- |
| |
| @Override |
| public ClientProducerCredits getProducerCredits() { |
| return producerCredits; |
| } |
| |
| private void doCleanup() { |
| if (address != null) { |
| session.returnCredits(address); |
| } |
| |
| session.removeProducer(this); |
| |
| closed = true; |
| } |
| |
| private void doSend(SimpleString sendingAddress, |
| final Message msg, |
| final SendAcknowledgementHandler handler, |
| final boolean forceAsync) throws ActiveMQException { |
| if (sendingAddress == null) { |
| sendingAddress = this.address; |
| } |
| session.startCall(); |
| |
| try { |
| MessageInternal msgI = (MessageInternal) msg; |
| |
| ClientProducerCredits theCredits; |
| |
| boolean isLarge; |
| // a note about the second check on the writerIndexSize, |
| // If it's a server's message, it means this is being done through the bridge or some special consumer on the |
| // server's on which case we can't' convert the message into large at the servers |
| if (sessionContext.supportsLargeMessage() && (msgI.getBodyInputStream() != null || msgI.isLargeMessage() || |
| msgI.getBodyBuffer().writerIndex() > minLargeMessageSize && !msgI.isServerMessage())) { |
| isLarge = true; |
| } |
| else { |
| isLarge = false; |
| } |
| |
| if (!isLarge) { |
| session.setAddress(msg, sendingAddress); |
| } |
| else { |
| msg.setAddress(sendingAddress); |
| } |
| |
| // Anonymous |
| theCredits = session.getCredits(sendingAddress, true); |
| |
| if (rateLimiter != null) { |
| // Rate flow control |
| |
| rateLimiter.limit(); |
| } |
| |
| if (groupID != null) { |
| msgI.putStringProperty(Message.HDR_GROUP_ID, groupID); |
| } |
| |
| final boolean sendBlockingConfig = msgI.isDurable() ? blockOnDurableSend : blockOnNonDurableSend; |
| final boolean forceAsyncOverride = handler != null; |
| final boolean sendBlocking = sendBlockingConfig && !forceAsyncOverride; |
| |
| session.workDone(); |
| |
| if (isLarge) { |
| largeMessageSend(sendBlocking, msgI, theCredits, handler); |
| } |
| else { |
| sendRegularMessage(sendingAddress, msgI, sendBlocking, theCredits, handler); |
| } |
| } |
| finally { |
| session.endCall(); |
| } |
| } |
| |
| private void sendRegularMessage(final SimpleString sendingAddress, |
| final MessageInternal msgI, |
| final boolean sendBlocking, |
| final ClientProducerCredits theCredits, |
| final SendAcknowledgementHandler handler) throws ActiveMQException { |
| // This will block if credits are not available |
| |
| // Note, that for a large message, the encode size only includes the properties + headers |
| // Not the continuations, but this is ok since we are only interested in limiting the amount of |
| // data in *memory* and continuations go straight to the disk |
| |
| logger.tracef("sendRegularMessage::%s, Blocking=%s", msgI, sendBlocking); |
| |
| int creditSize = sessionContext.getCreditsOnSendingFull(msgI); |
| |
| theCredits.acquireCredits(creditSize); |
| |
| session.checkDefaultAddress(sendingAddress); |
| |
| sessionContext.sendFullMessage(msgI, sendBlocking, handler, address); |
| } |
| |
| private void checkClosed() throws ActiveMQException { |
| if (closed) { |
| throw ActiveMQClientMessageBundle.BUNDLE.producerClosed(); |
| } |
| } |
| |
| // Methods to send Large Messages---------------------------------------------------------------- |
| |
| /** |
| * @param msgI |
| * @param handler |
| * @throws ActiveMQException |
| */ |
| private void largeMessageSend(final boolean sendBlocking, |
| final MessageInternal msgI, |
| final ClientProducerCredits credits, |
| SendAcknowledgementHandler handler) throws ActiveMQException { |
| logger.tracef("largeMessageSend::%s, Blocking=%s", msgI, sendBlocking); |
| |
| int headerSize = msgI.getHeadersAndPropertiesEncodeSize(); |
| |
| if (msgI.getHeadersAndPropertiesEncodeSize() >= minLargeMessageSize) { |
| throw ActiveMQClientMessageBundle.BUNDLE.headerSizeTooBig(headerSize); |
| } |
| |
| // msg.getBody() could be Null on LargeServerMessage |
| if (msgI.getBodyInputStream() == null && msgI.getWholeBuffer() != null) { |
| msgI.getWholeBuffer().readerIndex(0); |
| } |
| |
| InputStream input; |
| |
| if (msgI.isServerMessage()) { |
| largeMessageSendServer(sendBlocking, msgI, credits, handler); |
| } |
| else if ((input = msgI.getBodyInputStream()) != null) { |
| largeMessageSendStreamed(sendBlocking, msgI, input, credits, handler); |
| } |
| else { |
| largeMessageSendBuffered(sendBlocking, msgI, credits, handler); |
| } |
| } |
| |
| private void sendInitialLargeMessageHeader(MessageInternal msgI, |
| ClientProducerCredits credits) throws ActiveMQException { |
| int creditsUsed = sessionContext.sendInitialChunkOnLargeMessage(msgI); |
| |
| // On the case of large messages we tried to send credits before but we would starve otherwise |
| // we may find a way to improve the logic and always acquire the credits before |
| // but that's the way it's been tested and been working ATM |
| credits.acquireCredits(creditsUsed); |
| } |
| |
| /** |
| * Used to send serverMessages through the bridges. No need to validate compression here since |
| * the message is only compressed at the client |
| * |
| * @param sendBlocking |
| * @param msgI |
| * @param handler |
| * @throws ActiveMQException |
| */ |
| private void largeMessageSendServer(final boolean sendBlocking, |
| final MessageInternal msgI, |
| final ClientProducerCredits credits, |
| SendAcknowledgementHandler handler) throws ActiveMQException { |
| sendInitialLargeMessageHeader(msgI, credits); |
| |
| BodyEncoder context = msgI.getBodyEncoder(); |
| |
| final long bodySize = context.getLargeBodySize(); |
| |
| final int reconnectID = sessionContext.getReconnectID(); |
| |
| context.open(); |
| try { |
| |
| for (long pos = 0; pos < bodySize; ) { |
| final boolean lastChunk; |
| |
| final int chunkLength = (int) Math.min((bodySize - pos), minLargeMessageSize); |
| |
| final ActiveMQBuffer bodyBuffer = ActiveMQBuffers.fixedBuffer(chunkLength); |
| |
| context.encode(bodyBuffer, chunkLength); |
| |
| pos += chunkLength; |
| |
| lastChunk = pos >= bodySize; |
| SendAcknowledgementHandler messageHandler = lastChunk ? handler : null; |
| |
| int creditsUsed = sessionContext.sendServerLargeMessageChunk(msgI, -1, sendBlocking, lastChunk, bodyBuffer.toByteBuffer().array(), messageHandler); |
| |
| credits.acquireCredits(creditsUsed); |
| } |
| } |
| finally { |
| context.close(); |
| } |
| } |
| |
| /** |
| * @param sendBlocking |
| * @param msgI |
| * @param handler |
| * @throws ActiveMQException |
| */ |
| private void largeMessageSendBuffered(final boolean sendBlocking, |
| final MessageInternal msgI, |
| final ClientProducerCredits credits, |
| SendAcknowledgementHandler handler) throws ActiveMQException { |
| msgI.getBodyBuffer().readerIndex(0); |
| largeMessageSendStreamed(sendBlocking, msgI, new ActiveMQBufferInputStream(msgI.getBodyBuffer()), credits, handler); |
| } |
| |
| /** |
| * @param sendBlocking |
| * @param msgI |
| * @param inputStreamParameter |
| * @param credits |
| * @throws ActiveMQException |
| */ |
| private void largeMessageSendStreamed(final boolean sendBlocking, |
| final MessageInternal msgI, |
| final InputStream inputStreamParameter, |
| final ClientProducerCredits credits, |
| SendAcknowledgementHandler handler) throws ActiveMQException { |
| boolean lastPacket = false; |
| |
| InputStream input = inputStreamParameter; |
| |
| // We won't know the real size of the message since we are compressing while reading the streaming. |
| // This counter will be passed to the deflater to be updated for every byte read |
| AtomicLong messageSize = new AtomicLong(); |
| |
| DeflaterReader deflaterReader = null; |
| |
| if (session.isCompressLargeMessages()) { |
| msgI.putBooleanProperty(Message.HDR_LARGE_COMPRESSED, true); |
| deflaterReader = new DeflaterReader(inputStreamParameter, messageSize); |
| input = deflaterReader; |
| } |
| |
| long totalSize = 0; |
| |
| boolean headerSent = false; |
| |
| |
| int reconnectID = sessionContext.getReconnectID(); |
| while (!lastPacket) { |
| byte[] buff = new byte[minLargeMessageSize]; |
| |
| int pos = 0; |
| |
| do { |
| int numberOfBytesRead; |
| |
| int wanted = minLargeMessageSize - pos; |
| |
| try { |
| numberOfBytesRead = input.read(buff, pos, wanted); |
| } |
| catch (IOException e) { |
| throw ActiveMQClientMessageBundle.BUNDLE.errorReadingBody(e); |
| } |
| |
| if (numberOfBytesRead == -1) { |
| lastPacket = true; |
| |
| break; |
| } |
| |
| pos += numberOfBytesRead; |
| } while (pos < minLargeMessageSize); |
| |
| totalSize += pos; |
| |
| if (lastPacket) { |
| if (!session.isCompressLargeMessages()) { |
| messageSize.set(totalSize); |
| } |
| |
| // This is replacing the last packet by a smaller packet |
| byte[] buff2 = new byte[pos]; |
| |
| System.arraycopy(buff, 0, buff2, 0, pos); |
| |
| buff = buff2; |
| |
| // This is the case where the message is being converted as a regular message |
| if (!headerSent && session.isCompressLargeMessages() && buff2.length < minLargeMessageSize) { |
| msgI.getBodyBuffer().resetReaderIndex(); |
| msgI.getBodyBuffer().resetWriterIndex(); |
| msgI.putLongProperty(Message.HDR_LARGE_BODY_SIZE, deflaterReader.getTotalSize()); |
| |
| msgI.getBodyBuffer().writeBytes(buff, 0, pos); |
| sendRegularMessage(msgI.getAddress(), msgI, sendBlocking, credits, handler); |
| return; |
| } |
| else { |
| if (!headerSent) { |
| headerSent = true; |
| sendInitialLargeMessageHeader(msgI, credits); |
| } |
| int creditsSent = sessionContext.sendLargeMessageChunk(msgI, messageSize.get(), sendBlocking, true, buff, reconnectID, handler); |
| credits.acquireCredits(creditsSent); |
| } |
| } |
| else { |
| if (!headerSent) { |
| headerSent = true; |
| sendInitialLargeMessageHeader(msgI, credits); |
| } |
| |
| int creditsSent = sessionContext.sendLargeMessageChunk(msgI, messageSize.get(), sendBlocking, false, buff, reconnectID, handler); |
| credits.acquireCredits(creditsSent); |
| } |
| } |
| |
| try { |
| input.close(); |
| } |
| catch (IOException e) { |
| throw ActiveMQClientMessageBundle.BUNDLE.errorClosingLargeMessage(e); |
| } |
| } |
| } |