blob: 2b55d294b59f9a67dfb98aa3f4d5e97c9e2d153d [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
/*
* This file is auto-generated by Qpid Gentools v.0.1 - do not modify.
* Supported AMQP versions:
* 8-0
*/
package org.apache.qpid.server.output.amqp0_8;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQMessageHandle;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.framing.*;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.AMQException;
import org.apache.mina.common.ByteBuffer;
import java.util.Iterator;
public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
{
public static Factory getInstanceFactory()
{
return new Factory()
{
public ProtocolOutputConverter newInstance(AMQProtocolSession session)
{
return new ProtocolOutputConverterImpl(session);
}
};
}
private final AMQProtocolSession _protocolSession;
private ProtocolOutputConverterImpl(AMQProtocolSession session)
{
_protocolSession = session;
}
public AMQProtocolSession getProtocolSession()
{
return _protocolSession;
}
public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
throws AMQException
{
AMQDataBlock deliver = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag);
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
message.getContentHeaderBody());
final AMQMessageHandle messageHandle = message.getMessageHandle();
final StoreContext storeContext = message.getStoreContext();
final int bodyCount = messageHandle.getBodyCount(storeContext);
if(bodyCount == 0)
{
SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
contentHeader);
writeFrame(compositeBlock);
}
else
{
//
// Optimise the case where we have a single content body. In that case we create a composite block
// so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
//
ContentChunk cb = messageHandle.getContentChunk(storeContext, 0);
AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
writeFrame(compositeBlock);
//
// Now start writing out the other content bodies
//
for(int i = 1; i < bodyCount; i++)
{
cb = messageHandle.getContentChunk(storeContext, i);
writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
}
}
}
public void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException
{
final AMQMessageHandle messageHandle = message.getMessageHandle();
final StoreContext storeContext = message.getStoreContext();
AMQDataBlock deliver = createEncodedGetOkFrame(message, channelId, deliveryTag, queueSize);
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
message.getContentHeaderBody());
final int bodyCount = messageHandle.getBodyCount(storeContext);
if(bodyCount == 0)
{
SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
contentHeader);
writeFrame(compositeBlock);
}
else
{
//
// Optimise the case where we have a single content body. In that case we create a composite block
// so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
//
ContentChunk cb = messageHandle.getContentChunk(storeContext, 0);
AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
writeFrame(compositeBlock);
//
// Now start writing out the other content bodies
//
for(int i = 1; i < bodyCount; i++)
{
cb = messageHandle.getContentChunk(storeContext, i);
writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
}
}
}
private AMQDataBlock createEncodedDeliverFrame(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
throws AMQException
{
final MessagePublishInfo pb = message.getMessagePublishInfo();
final AMQMessageHandle messageHandle = message.getMessageHandle();
MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
BasicDeliverBody deliverBody =
methodRegistry.createBasicDeliverBody(consumerTag,
deliveryTag,
messageHandle.isRedelivered(),
pb.getExchange(),
pb.getRoutingKey());
AMQFrame deliverFrame = deliverBody.generateFrame(channelId);
return deliverFrame;
}
private AMQDataBlock createEncodedGetOkFrame(AMQMessage message, int channelId, long deliveryTag, int queueSize)
throws AMQException
{
final MessagePublishInfo pb = message.getMessagePublishInfo();
final AMQMessageHandle messageHandle = message.getMessageHandle();
MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
BasicGetOkBody getOkBody =
methodRegistry.createBasicGetOkBody(deliveryTag,
messageHandle.isRedelivered(),
pb.getExchange(),
pb.getRoutingKey(),
queueSize);
AMQFrame getOkFrame = getOkBody.generateFrame(channelId);
return getOkFrame;
}
public byte getProtocolMinorVersion()
{
return getProtocolSession().getProtocolMinorVersion();
}
public byte getProtocolMajorVersion()
{
return getProtocolSession().getProtocolMajorVersion();
}
private AMQDataBlock createEncodedReturnFrame(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException
{
MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
BasicReturnBody basicReturnBody =
methodRegistry.createBasicReturnBody(replyCode,
replyText,
message.getMessagePublishInfo().getExchange(),
message.getMessagePublishInfo().getRoutingKey());
AMQFrame returnFrame = basicReturnBody.generateFrame(channelId);
return returnFrame;
}
public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText)
throws AMQException
{
AMQDataBlock returnFrame = createEncodedReturnFrame(message, channelId, replyCode, replyText);
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
message.getContentHeaderBody());
Iterator<AMQDataBlock> bodyFrameIterator = message.getBodyFrameIterator(getProtocolSession(), channelId);
//
// Optimise the case where we have a single content body. In that case we create a composite block
// so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
//
if (bodyFrameIterator.hasNext())
{
AMQDataBlock firstContentBody = bodyFrameIterator.next();
AMQDataBlock[] blocks = new AMQDataBlock[]{returnFrame, contentHeader, firstContentBody};
CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
writeFrame(compositeBlock);
}
else
{
CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(new AMQDataBlock[]{returnFrame, contentHeader});
writeFrame(compositeBlock);
}
//
// Now start writing out the other content bodies
// TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded
//
while (bodyFrameIterator.hasNext())
{
writeFrame(bodyFrameIterator.next());
}
}
public void writeFrame(AMQDataBlock block)
{
getProtocolSession().writeFrame(block);
}
public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
{
MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
BasicCancelOkBody basicCancelOkBody = methodRegistry.createBasicCancelOkBody(consumerTag);
writeFrame(basicCancelOkBody.generateFrame(channelId));
}
}