| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.qpid.client; |
| |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.security.GeneralSecurityException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.List; |
| import java.util.UUID; |
| |
| import javax.crypto.spec.SecretKeySpec; |
| import javax.jms.JMSException; |
| import javax.jms.Message; |
| import javax.jms.Queue; |
| import javax.jms.Topic; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.qpid.QpidException; |
| import org.apache.qpid.client.failover.FailoverException; |
| import org.apache.qpid.client.message.AMQMessageDelegate_0_8; |
| import org.apache.qpid.client.message.AbstractJMSMessage; |
| import org.apache.qpid.client.message.Encrypted091MessageFactory; |
| import org.apache.qpid.client.message.MessageEncryptionHelper; |
| import org.apache.qpid.client.message.QpidMessageProperties; |
| import org.apache.qpid.client.protocol.BlockingMethodFrameListener; |
| import org.apache.qpid.client.util.JMSExceptionHelper; |
| import org.apache.qpid.configuration.ClientProperties; |
| import org.apache.qpid.framing.AMQFrame; |
| import org.apache.qpid.framing.AMQMethodBody; |
| import org.apache.qpid.framing.BasicAckBody; |
| import org.apache.qpid.framing.BasicContentHeaderProperties; |
| import org.apache.qpid.framing.BasicNackBody; |
| import org.apache.qpid.framing.BasicPublishBody; |
| import org.apache.qpid.framing.CompositeAMQDataBlock; |
| import org.apache.qpid.framing.ContentBody; |
| import org.apache.qpid.framing.ContentHeaderBody; |
| import org.apache.qpid.framing.ExchangeDeclareBody; |
| import org.apache.qpid.framing.FieldTable; |
| import org.apache.qpid.framing.MethodRegistry; |
| import org.apache.qpid.util.GZIPUtils; |
| |
| public class BasicMessageProducer_0_8 extends BasicMessageProducer |
| { |
| private static final Logger _logger = LoggerFactory.getLogger(BasicMessageProducer_0_8.class); |
| private static final boolean SET_EXPIRATION_AS_TTL = Boolean.getBoolean(ClientProperties.SET_EXPIRATION_AS_TTL); |
| |
| BasicMessageProducer_0_8(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId, |
| AMQSession session, AMQProtocolHandler protocolHandler, long producerId, Boolean immediate, Boolean mandatory) throws |
| QpidException |
| { |
| super(_logger,connection, destination,transacted,channelId,session, producerId, immediate, mandatory); |
| } |
| |
| void declareDestination(AMQDestination destination) throws QpidException |
| { |
| |
| if (destination.getDestSyntax() == AMQDestination.DestSyntax.ADDR) |
| { |
| getSession().resolveAddress(destination, false, false); |
| |
| getSession().handleLinkCreation(destination); |
| getSession().sync(); |
| } |
| else |
| { |
| if (getSession().isDeclareExchanges() && !getSession().isResolved(destination)) |
| { |
| final MethodRegistry methodRegistry = getSession().getMethodRegistry(); |
| ExchangeDeclareBody body = |
| methodRegistry.createExchangeDeclareBody(getSession().getTicket(), |
| destination.getExchangeName(), |
| destination.getExchangeClass(), |
| destination.getExchangeName() |
| .startsWith("amq."), |
| destination.isExchangeDurable(), |
| destination.isExchangeAutoDelete(), |
| destination.isExchangeInternal(), |
| true, |
| null); |
| AMQFrame declare = body.generateFrame(getChannelId()); |
| |
| getConnection().getProtocolHandler().writeFrame(declare); |
| getSession().setResolved(destination); |
| } |
| } |
| } |
| |
| void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message, |
| UUID messageId, int deliveryMode, int priority, long timeToLive, boolean mandatory, |
| boolean immediate, final long deliveryDelay) throws JMSException |
| { |
| |
| |
| AMQMessageDelegate_0_8 delegate = (AMQMessageDelegate_0_8) message.getDelegate(); |
| BasicContentHeaderProperties contentHeaderProperties = delegate.getContentHeaderProperties(); |
| |
| String routingKey = destination.getRoutingKey(); |
| |
| FieldTable headers = delegate.getContentHeaderProperties().getHeaders(); |
| |
| if (destination.getDestSyntax() == AMQDestination.DestSyntax.ADDR && |
| (destination.getSubject() != null |
| || (headers != null && headers.get(QpidMessageProperties.QPID_SUBJECT) != null))) |
| { |
| |
| if (headers.get(QpidMessageProperties.QPID_SUBJECT) == null) |
| { |
| // use default subject in address string |
| headers.setString(QpidMessageProperties.QPID_SUBJECT, destination.getSubject()); |
| } |
| |
| if (destination.getAddressType() == AMQDestination.TOPIC_TYPE) |
| { |
| routingKey = headers.getString(QpidMessageProperties.QPID_SUBJECT); |
| } |
| } |
| |
| BasicPublishBody body = getSession().getMethodRegistry().createBasicPublishBody(getSession().getTicket(), |
| destination.getExchangeName(), |
| routingKey, |
| mandatory, |
| immediate); |
| |
| AMQFrame publishFrame = body.generateFrame(getChannelId()); |
| |
| message.prepareForSending(); |
| ByteBuffer payload = message.getData(); |
| |
| contentHeaderProperties.setUserId(getUserID()); |
| |
| //Set the JMS_QPID_DESTTYPE for 0-8/9 messages |
| int type; |
| if (destination instanceof Topic) |
| { |
| type = AMQDestination.TOPIC_TYPE; |
| } |
| else if (destination instanceof Queue) |
| { |
| type = AMQDestination.QUEUE_TYPE; |
| } |
| else |
| { |
| type = AMQDestination.UNKNOWN_TYPE; |
| } |
| |
| //Set JMS_QPID_DESTTYPE |
| delegate.getContentHeaderProperties() |
| .getHeaders() |
| .setInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName(), type); |
| |
| long currentTime; |
| if (!isDisableTimestamps()) |
| { |
| |
| currentTime = System.currentTimeMillis(); |
| contentHeaderProperties.setTimestamp(currentTime); |
| |
| if (timeToLive > 0) |
| { |
| if (!SET_EXPIRATION_AS_TTL) |
| { |
| //default behaviour used by Qpid |
| contentHeaderProperties.setExpiration(currentTime + timeToLive); |
| } |
| else |
| { |
| //alternative behaviour for brokers interpreting the expiration header directly as a TTL. |
| contentHeaderProperties.setExpiration(timeToLive); |
| } |
| } |
| else |
| { |
| contentHeaderProperties.setExpiration(0); |
| } |
| } |
| else |
| { |
| currentTime = 0L; |
| } |
| |
| if(deliveryDelay != 0L && headers.get(QpidMessageProperties.QPID_NOT_VALID_BEFORE) == null) |
| { |
| if(currentTime == 0L) |
| { |
| currentTime = System.currentTimeMillis(); |
| } |
| headers.setLong(QpidMessageProperties.QPID_NOT_VALID_BEFORE, deliveryDelay+currentTime); |
| } |
| |
| |
| contentHeaderProperties.setDeliveryMode((byte) deliveryMode); |
| contentHeaderProperties.setPriority((byte) priority); |
| |
| int size = (payload != null) ? payload.remaining() : 0; |
| AMQFrame contentHeaderFrame; |
| final AMQFrame[] frames; |
| boolean encrypt = message.getBooleanProperty(MessageEncryptionHelper.ENCRYPT_HEADER) || destination.sendEncrypted(); |
| if(encrypt) |
| { |
| MessageEncryptionHelper encryptionHelper = getSession().getMessageEncryptionHelper(); |
| try |
| { |
| SecretKeySpec secretKey = encryptionHelper.createSecretKey(); |
| |
| contentHeaderProperties.getHeaders().remove(MessageEncryptionHelper.ENCRYPT_HEADER); |
| |
| String recipientString = message.getStringProperty(MessageEncryptionHelper.ENCRYPT_RECIPIENTS_HEADER); |
| if(recipientString == null) |
| { |
| recipientString = destination.getEncryptedRecipients(); |
| } |
| contentHeaderProperties.getHeaders().remove(MessageEncryptionHelper.ENCRYPT_RECIPIENTS_HEADER); |
| |
| String unencryptedProperties = message.getStringProperty(MessageEncryptionHelper.UNENCRYPTED_PROPERTIES_HEADER); |
| contentHeaderProperties.getHeaders().remove(MessageEncryptionHelper.UNENCRYPTED_PROPERTIES_HEADER); |
| |
| final int headerLength = contentHeaderProperties.getPropertyListSize() + 2; |
| byte[] unencryptedBytes = new byte[headerLength + size]; |
| ByteBuffer output = ByteBuffer.wrap(unencryptedBytes); |
| output.putShort((short) (contentHeaderProperties.getPropertyFlags() & 0xffff)); |
| contentHeaderProperties.writePropertyListPayload(output); |
| |
| if (size != 0) |
| { |
| payload.get(unencryptedBytes, headerLength, payload.remaining()); |
| } |
| |
| byte[] ivbytes = encryptionHelper.getInitialisationVector(); |
| |
| byte[] encryptedBytes = encryptionHelper.encrypt(secretKey, unencryptedBytes, ivbytes); |
| payload = ByteBuffer.wrap(encryptedBytes); |
| |
| if (recipientString == null) |
| { |
| throw new JMSException("When sending an encrypted message, recipients must be supplied"); |
| } |
| String[] recipients = recipientString.split(";"); |
| List<List<Object>> encryptedKeys = new ArrayList<>(); |
| for(MessageEncryptionHelper.KeyTransportRecipientInfo info : encryptionHelper.getKeyTransportRecipientInfo(Arrays.asList(recipients), secretKey)) |
| { |
| encryptedKeys.add(info.asList()); |
| } |
| |
| BasicContentHeaderProperties oldProps = contentHeaderProperties; |
| contentHeaderProperties = new BasicContentHeaderProperties(oldProps); |
| final FieldTable oldHeaders = oldProps.getHeaders(); |
| final FieldTable newHeaders = contentHeaderProperties.getHeaders(); |
| newHeaders.clear(); |
| |
| if(unencryptedProperties != null) |
| { |
| List<String> unencryptedPropertyNames = Arrays.asList(unencryptedProperties.split(" *; *")); |
| for (String propertyName : unencryptedPropertyNames) |
| { |
| if (oldHeaders.propertyExists(propertyName)) |
| { |
| newHeaders.setObject(propertyName, oldHeaders.get(propertyName)); |
| } |
| } |
| } |
| |
| newHeaders.setObject(MessageEncryptionHelper.ENCRYPTED_KEYS_PROPERTY, encryptedKeys); |
| newHeaders.setString(MessageEncryptionHelper.ENCRYPTION_ALGORITHM_PROPERTY, |
| encryptionHelper.getMessageEncryptionCipherName()); |
| newHeaders.setBytes(MessageEncryptionHelper.KEY_INIT_VECTOR_PROPERTY, ivbytes); |
| contentHeaderProperties.setContentType(Encrypted091MessageFactory.ENCRYPTED_0_9_1_CONTENT_TYPE); |
| size = encryptedBytes.length; |
| |
| } |
| catch (GeneralSecurityException | IOException e) |
| { |
| throw JMSExceptionHelper.chainJMSException(new JMSException("Unexpected Exception while encrypting message"), e); |
| } |
| |
| } |
| else |
| { |
| byte[] compressed; |
| if (size > getConnection().getMessageCompressionThresholdSize() |
| && getConnection().getDelegate().isMessageCompressionSupported() |
| && getConnection().isMessageCompressionDesired() |
| && contentHeaderProperties.getEncoding() == null |
| && (compressed = GZIPUtils.compressBufferToArray(payload)) != null) |
| { |
| contentHeaderProperties.setEncoding("gzip"); |
| payload = ByteBuffer.wrap(compressed); |
| size = compressed.length; |
| |
| } |
| } |
| final int contentBodyFrameCount = calculateContentBodyFrameCount(payload); |
| frames = new AMQFrame[2 + contentBodyFrameCount]; |
| |
| if (payload != null) |
| { |
| createContentBodies(payload, frames, 2, getChannelId()); |
| } |
| |
| contentHeaderFrame = |
| ContentHeaderBody.createAMQFrame(getChannelId(), |
| contentHeaderProperties, size); |
| |
| |
| if (getLogger().isDebugEnabled()) |
| { |
| getLogger().debug("Sending " + (frames.length-2) + " content body frames to " + destination); |
| } |
| |
| if (contentHeaderFrame.getSize() > getSession().getAMQConnection().getMaximumFrameSize()) |
| { |
| throw new JMSException("Unable to send message as the headers are too large (" |
| + contentHeaderFrame.getSize() |
| + " bytes, but the maximum negotiated frame size is " |
| + getSession().getAMQConnection().getMaximumFrameSize() |
| + ")."); |
| } |
| if (getLogger().isDebugEnabled()) |
| { |
| getLogger().debug("Sending content header frame to " + destination); |
| } |
| |
| |
| frames[0] = publishFrame; |
| frames[1] = contentHeaderFrame; |
| final CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames); |
| |
| try |
| { |
| getSession().checkFlowControl(); |
| } |
| catch (InterruptedException e) |
| { |
| throw JMSExceptionHelper.chainJMSException(new JMSException( |
| "Interrupted while waiting for flow control to be removed"), e); |
| } |
| |
| AMQConnectionDelegate_8_0 connectionDelegate80 = (AMQConnectionDelegate_8_0) (getConnection().getDelegate()); |
| |
| boolean useConfirms = getPublishMode() == PublishMode.SYNC_PUBLISH_ALL |
| && (connectionDelegate80.isConfirmedPublishSupported() |
| || (!getSession().isTransacted() && connectionDelegate80.isConfirmedPublishNonTransactionalSupported())); |
| |
| if(!useConfirms) |
| { |
| getConnection().getProtocolHandler().writeFrame(compositeFrame); |
| } |
| else |
| { |
| final PublishConfirmMessageListener frameListener = new PublishConfirmMessageListener(getChannelId()); |
| try |
| { |
| |
| getConnection().getProtocolHandler().writeCommandFrameAndWaitForReply(compositeFrame, |
| frameListener); |
| |
| if(frameListener.isRejected()) |
| { |
| throw new JMSException("The message was not accepted by the server (e.g. because the address was no longer valid)"); |
| } |
| } |
| catch (QpidException e) |
| { |
| throw JMSExceptionHelper.chainJMSException(new JMSException(e.getMessage()), e); |
| } |
| catch (FailoverException e) |
| { |
| throw JMSExceptionHelper.chainJMSException(new JMSException( |
| "Fail-over interrupted send. Status of the send is uncertain."), e); |
| |
| } |
| } |
| } |
| |
| /** |
| * Create content bodies. This will split a large message into numerous bodies depending on the negotiated |
| * maximum frame size. |
| * |
| * @param payload |
| * @param frames |
| * @param offset |
| * @param channelId @return the array of content bodies |
| */ |
| private void createContentBodies(ByteBuffer payload, AMQFrame[] frames, int offset, int channelId) |
| { |
| |
| if (frames.length == (offset + 1)) |
| { |
| frames[offset] = ContentBody.createAMQFrame(channelId, new ContentBody(payload.slice())); |
| } |
| else |
| { |
| |
| final long framePayloadMax = getMaximumPayloadSize(); |
| long remaining = payload.remaining(); |
| for (int i = offset; i < frames.length; i++) |
| { |
| payload.position((int) framePayloadMax * (i - offset)); |
| int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining; |
| payload.limit(payload.position() + length); |
| |
| frames[i] = ContentBody.createAMQFrame(channelId, new ContentBody(payload.slice())); |
| |
| remaining -= length; |
| } |
| } |
| |
| } |
| |
| private int calculateContentBodyFrameCount(ByteBuffer payload) |
| { |
| // we subtract one from the total frame maximum size to account for the end of frame marker in a body frame |
| // (0xCE byte). |
| int frameCount; |
| if ((payload == null) || (payload.remaining() == 0)) |
| { |
| frameCount = 0; |
| } |
| else |
| { |
| int dataLength = payload.remaining(); |
| final long framePayloadMax = getMaximumPayloadSize(); |
| int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0; |
| frameCount = (int) (dataLength / framePayloadMax) + lastFrame; |
| } |
| |
| return frameCount; |
| } |
| |
| private long getMaximumPayloadSize() |
| { |
| return getSession().getAMQConnection().getMaximumFrameSize() - 8; |
| } |
| |
| @Override |
| public AMQSession_0_8 getSession() |
| { |
| return (AMQSession_0_8) super.getSession(); |
| } |
| |
| private static class PublishConfirmMessageListener extends BlockingMethodFrameListener |
| { |
| private boolean _rejected; |
| |
| /** |
| * Creates a new method listener, that filters incoming method to just those that match the specified channel id. |
| * |
| * @param channelId The channel id to filter incoming methods with. |
| */ |
| public PublishConfirmMessageListener(final int channelId) |
| { |
| super(channelId); |
| } |
| |
| @Override |
| public boolean processMethod(final int channelId, final AMQMethodBody frame) |
| { |
| if (frame instanceof BasicAckBody) |
| { |
| return true; |
| } |
| else if (frame instanceof BasicNackBody) |
| { |
| _rejected = true; |
| return true; |
| } |
| else |
| { |
| return false; |
| } |
| } |
| |
| public boolean isRejected() |
| { |
| return _rejected; |
| } |
| } |
| } |