blob: 21ff6c877acf33ffca0a3cb34ce50330b3772641 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.client;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.message.AMQMessageDelegate_0_8;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.CompositeAMQDataBlock;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.ExchangeDeclareBody;
import org.apache.qpid.framing.MethodRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.Topic;
import java.nio.ByteBuffer;
import java.util.UUID;
public class BasicMessageProducer_0_8 extends BasicMessageProducer
{
private static final Logger _logger = LoggerFactory.getLogger(BasicMessageProducer_0_8.class);
BasicMessageProducer_0_8(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
AMQSession session, AMQProtocolHandler protocolHandler, long producerId, Boolean immediate, Boolean mandatory) throws AMQException
{
super(_logger,connection, destination,transacted,channelId,session, protocolHandler, producerId, immediate, mandatory);
}
void declareDestination(AMQDestination destination)
{
final MethodRegistry methodRegistry = getSession().getMethodRegistry();
ExchangeDeclareBody body =
methodRegistry.createExchangeDeclareBody(getSession().getTicket(),
destination.getExchangeName(),
destination.getExchangeClass(),
destination.getExchangeName().toString().startsWith("amq."),
false,
false,
false,
true,
null);
// Declare the exchange
// Note that the durable and internal arguments are ignored since passive is set to false
AMQFrame declare = body.generateFrame(getChannelId());
getProtocolHandler().writeFrame(declare);
}
void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
UUID messageId, int deliveryMode,int priority, long timeToLive, boolean mandatory,
boolean immediate) throws JMSException
{
BasicPublishBody body = getSession().getMethodRegistry().createBasicPublishBody(getSession().getTicket(),
destination.getExchangeName(),
destination.getRoutingKey(),
mandatory,
immediate);
AMQFrame publishFrame = body.generateFrame(getChannelId());
message.prepareForSending();
ByteBuffer payload = message.getData();
AMQMessageDelegate_0_8 delegate = (AMQMessageDelegate_0_8) message.getDelegate();
BasicContentHeaderProperties contentHeaderProperties = delegate.getContentHeaderProperties();
contentHeaderProperties.setUserId(getUserID());
//Set the JMS_QPID_DESTTYPE for 0-8/9 messages
int type;
if (destination instanceof Topic)
{
type = AMQDestination.TOPIC_TYPE;
}
else if (destination instanceof Queue)
{
type = AMQDestination.QUEUE_TYPE;
}
else
{
type = AMQDestination.UNKNOWN_TYPE;
}
//Set JMS_QPID_DESTTYPE
delegate.getContentHeaderProperties().getHeaders().setInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName(), type);
if (!isDisableTimestamps())
{
final long currentTime = System.currentTimeMillis();
contentHeaderProperties.setTimestamp(currentTime);
if (timeToLive > 0)
{
contentHeaderProperties.setExpiration(currentTime + timeToLive);
}
else
{
contentHeaderProperties.setExpiration(0);
}
}
contentHeaderProperties.setDeliveryMode((byte) deliveryMode);
contentHeaderProperties.setPriority((byte) priority);
final int size = (payload != null) ? payload.limit() : 0;
final int contentBodyFrameCount = calculateContentBodyFrameCount(payload);
final AMQFrame[] frames = new AMQFrame[2 + contentBodyFrameCount];
if (payload != null)
{
createContentBodies(payload, frames, 2, getChannelId());
}
if ((contentBodyFrameCount != 0) && getLogger().isDebugEnabled())
{
getLogger().debug("Sending content body frames to " + destination);
}
// TODO: This is a hacky way of getting the AMQP class-id for the Basic class
int classIfForBasic = getSession().getMethodRegistry().createBasicQosOkBody().getClazz();
AMQFrame contentHeaderFrame =
ContentHeaderBody.createAMQFrame(getChannelId(),
classIfForBasic, 0, contentHeaderProperties, size);
if (getLogger().isDebugEnabled())
{
getLogger().debug("Sending content header frame to " + destination);
}
frames[0] = publishFrame;
frames[1] = contentHeaderFrame;
CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
try
{
getSession().checkFlowControl();
}
catch (InterruptedException e)
{
JMSException jmse = new JMSException("Interrupted while waiting for flow control to be removed");
jmse.setLinkedException(e);
jmse.initCause(e);
throw jmse;
}
getProtocolHandler().writeFrame(compositeFrame);
}
/**
* Create content bodies. This will split a large message into numerous bodies depending on the negotiated
* maximum frame size.
*
* @param payload
* @param frames
* @param offset
* @param channelId @return the array of content bodies
*/
private void createContentBodies(ByteBuffer payload, AMQFrame[] frames, int offset, int channelId)
{
if (frames.length == (offset + 1))
{
byte[] data = new byte[payload.remaining()];
payload.get(data);
frames[offset] = ContentBody.createAMQFrame(channelId, new ContentBody(data));
}
else
{
final long framePayloadMax = getSession().getAMQConnection().getMaximumFrameSize() - 1;
long remaining = payload.remaining();
for (int i = offset; i < frames.length; i++)
{
payload.position((int) framePayloadMax * (i - offset));
int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining;
payload.limit(payload.position() + length);
byte[] data = new byte[payload.remaining()];
payload.get(data);
frames[i] = ContentBody.createAMQFrame(channelId, new ContentBody(data));
remaining -= length;
}
}
}
private int calculateContentBodyFrameCount(ByteBuffer payload)
{
// we substract one from the total frame maximum size to account for the end of frame marker in a body frame
// (0xCE byte).
int frameCount;
if ((payload == null) || (payload.remaining() == 0))
{
frameCount = 0;
}
else
{
int dataLength = payload.remaining();
final long framePayloadMax = getSession().getAMQConnection().getMaximumFrameSize() - 1;
int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0;
frameCount = (int) (dataLength / framePayloadMax) + lastFrame;
}
return frameCount;
}
}