blob: 65ca9bfea38fa97b6e3816815be47a1c9958ff7b [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.client.message;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicDeliverBody;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
/**
* This class contains everything needed to process a JMS message. It assembles the deliver body, the content header and
* the content body/ies.
*
* Note that the actual work of creating a JMS message for the client code's use is done outside of the MINA dispatcher
* thread in order to minimise the amount of work done in the MINA dispatcher thread.
*/
public class UnprocessedMessage_0_8 extends UnprocessedMessage
{
private long _bytesReceived = 0;
private static final Logger LOGGER = LoggerFactory.getLogger(UnprocessedMessage_0_8.class);
private AMQShortString _exchange;
private AMQShortString _routingKey;
private final long _deliveryId;
private boolean _redelivered;
private BasicDeliverBody _deliverBody;
private ContentHeaderBody _contentHeader;
/** List of ContentBody instances. Due to fragmentation you don't know how big this will be in general */
private List<ContentBody> _bodies;
public UnprocessedMessage_0_8(long deliveryId, String consumerTag, AMQShortString exchange, AMQShortString routingKey, boolean redelivered)
{
super(consumerTag);
_exchange = exchange;
_routingKey = routingKey;
_redelivered = redelivered;
_deliveryId = deliveryId;
}
public AMQShortString getExchange()
{
return _exchange;
}
public AMQShortString getRoutingKey()
{
return _routingKey;
}
public long getDeliveryTag()
{
return _deliveryId;
}
public boolean isRedelivered()
{
return _redelivered;
}
public void receiveBody(ContentBody body)
{
if (body.getPayload() != null)
{
final long payloadSize = body.getPayload().remaining();
if (_bodies == null)
{
if (payloadSize == getContentHeader().getBodySize())
{
_bodies = Collections.singletonList(body);
}
else
{
_bodies = new ArrayList<ContentBody>();
_bodies.add(body);
}
}
else
{
_bodies.add(body);
}
_bytesReceived += payloadSize;
}
}
public void setMethodBody(BasicDeliverBody deliverBody)
{
_deliverBody = deliverBody;
}
public void setContentHeader(ContentHeaderBody contentHeader)
{
this._contentHeader = contentHeader;
}
public boolean isAllBodyDataReceived()
{
LOGGER.debug("Received {} of {} bytes for message body", _bytesReceived, getContentHeader().getBodySize());
return _bytesReceived == getContentHeader().getBodySize();
}
public BasicDeliverBody getDeliverBody()
{
return _deliverBody;
}
public ContentHeaderBody getContentHeader()
{
return _contentHeader;
}
public List<ContentBody> getBodies()
{
return _bodies;
}
public String toString()
{
StringBuilder buf = new StringBuilder();
if (_contentHeader != null)
{
buf.append("ContentHeader " + _contentHeader);
}
if(_deliverBody != null)
{
buf.append("Delivery tag " + _deliverBody.getDeliveryTag());
buf.append("Consumer tag " + _deliverBody.getConsumerTag());
buf.append("Deliver Body " + _deliverBody);
}
return buf.toString();
}
}