| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| #include <boost/cast.hpp> |
| |
| #include <BrokerMessage.h> |
| #include <iostream> |
| |
| #include <InMemoryContent.h> |
| #include <LazyLoadedContent.h> |
| #include <MessageStore.h> |
| #include <BasicDeliverBody.h> |
| #include <BasicGetOkBody.h> |
| #include <AMQContentBody.h> |
| #include <AMQHeaderBody.h> |
| #include "AMQMethodBody.h" |
| #include "AMQFrame.h" |
| #include "framing/ChannelAdapter.h" |
| |
| using namespace boost; |
| using namespace qpid::broker; |
| using namespace qpid::framing; |
| using namespace qpid::sys; |
| |
| BasicMessage::BasicMessage( |
| const ConnectionToken* const _publisher, |
| const string& _exchange, const string& _routingKey, |
| bool _mandatory, bool _immediate, framing::AMQMethodBody::shared_ptr respondTo |
| ) : |
| Message(_publisher, _exchange, _routingKey, _mandatory, |
| _immediate, respondTo), |
| size(0) |
| {} |
| |
| // FIXME aconway 2007-02-01: remove. |
| // BasicMessage::BasicMessage(Buffer& buffer, bool headersOnly, uint32_t contentChunkSize) : |
| // publisher(0), size(0) |
| // { |
| |
| // decode(buffer, headersOnly, contentChunkSize); |
| // } |
| |
| // For tests only. |
| BasicMessage::BasicMessage() : size(0) |
| {} |
| |
| BasicMessage::~BasicMessage(){} |
| |
| void BasicMessage::setHeader(AMQHeaderBody::shared_ptr _header){ |
| this->header = _header; |
| } |
| |
| void BasicMessage::addContent(AMQContentBody::shared_ptr data){ |
| if (!content.get()) { |
| content = std::auto_ptr<Content>(new InMemoryContent()); |
| } |
| content->add(data); |
| size += data->size(); |
| } |
| |
| bool BasicMessage::isComplete(){ |
| return header.get() && (header->getContentSize() == contentSize()); |
| } |
| |
| void BasicMessage::deliver(ChannelAdapter& channel, |
| const string& consumerTag, uint64_t deliveryTag, |
| uint32_t framesize) |
| { |
| // CCT -- TODO - Update code generator to take pointer/ not |
| // instance to avoid extra contruction |
| channel.send( |
| new BasicDeliverBody( |
| channel.getVersion(), consumerTag, deliveryTag, |
| getRedelivered(), getExchange(), getRoutingKey())); |
| sendContent(channel, framesize); |
| } |
| |
| void BasicMessage::sendGetOk(const MethodContext& context, |
| const std::string& /*destination*/, |
| uint32_t messageCount, |
| uint64_t deliveryTag, |
| uint32_t framesize) |
| { |
| // CCT -- TODO - Update code generator to take pointer/ not |
| // instance to avoid extra contruction |
| context.channel->send( |
| new BasicGetOkBody( |
| context.channel->getVersion(), |
| context.methodBody->getRequestId(), |
| deliveryTag, getRedelivered(), getExchange(), |
| getRoutingKey(), messageCount)); |
| sendContent(*context.channel, framesize); |
| } |
| |
| void BasicMessage::sendContent( |
| ChannelAdapter& channel, uint32_t framesize) |
| { |
| channel.send(header); |
| Mutex::ScopedLock locker(contentLock); |
| if (content.get()) |
| content->send(channel, framesize); |
| } |
| |
| BasicHeaderProperties* BasicMessage::getHeaderProperties(){ |
| return boost::polymorphic_downcast<BasicHeaderProperties*>( |
| header->getProperties()); |
| } |
| |
| const FieldTable& BasicMessage::getApplicationHeaders(){ |
| return getHeaderProperties()->getHeaders(); |
| } |
| |
| bool BasicMessage::isPersistent() |
| { |
| if(!header) return false; |
| BasicHeaderProperties* props = getHeaderProperties(); |
| return props && props->getDeliveryMode() == PERSISTENT; |
| } |
| |
| void BasicMessage::decode(Buffer& buffer, bool headersOnly, uint32_t contentChunkSize) |
| { |
| decodeHeader(buffer); |
| if (!headersOnly) decodeContent(buffer, contentChunkSize); |
| } |
| |
| void BasicMessage::decodeHeader(Buffer& buffer) |
| { |
| string exchange; |
| string routingKey; |
| |
| buffer.getShortString(exchange); |
| buffer.getShortString(routingKey); |
| setRouting(exchange, routingKey); |
| |
| uint32_t headerSize = buffer.getLong(); |
| AMQHeaderBody::shared_ptr headerBody(new AMQHeaderBody()); |
| headerBody->decode(buffer, headerSize); |
| setHeader(headerBody); |
| } |
| |
| void BasicMessage::decodeContent(Buffer& buffer, uint32_t chunkSize) |
| { |
| uint64_t expected = expectedContentSize(); |
| if (expected != buffer.available()) { |
| std::cout << "WARN: Expected " << expectedContentSize() << " bytes, got " << buffer.available() << std::endl; |
| throw Exception("Cannot decode content, buffer not large enough."); |
| } |
| |
| if (!chunkSize || chunkSize > expected) { |
| chunkSize = expected; |
| } |
| |
| uint64_t total = 0; |
| while (total < expectedContentSize()) { |
| uint64_t remaining = expected - total; |
| AMQContentBody::shared_ptr contentBody(new AMQContentBody()); |
| contentBody->decode(buffer, remaining < chunkSize ? remaining : chunkSize); |
| addContent(contentBody); |
| total += chunkSize; |
| } |
| } |
| |
| void BasicMessage::encode(Buffer& buffer) |
| { |
| encodeHeader(buffer); |
| encodeContent(buffer); |
| } |
| |
| void BasicMessage::encodeHeader(Buffer& buffer) |
| { |
| buffer.putShortString(getExchange()); |
| buffer.putShortString(getRoutingKey()); |
| buffer.putLong(header->size()); |
| header->encode(buffer); |
| } |
| |
| void BasicMessage::encodeContent(Buffer& buffer) |
| { |
| Mutex::ScopedLock locker(contentLock); |
| if (content.get()) content->encode(buffer); |
| } |
| |
| uint32_t BasicMessage::encodedSize() |
| { |
| return encodedHeaderSize() + encodedContentSize(); |
| } |
| |
| uint32_t BasicMessage::encodedContentSize() |
| { |
| Mutex::ScopedLock locker(contentLock); |
| return content.get() ? content->size() : 0; |
| } |
| |
| uint32_t BasicMessage::encodedHeaderSize() |
| { |
| return getExchange().size() + 1 |
| + getRoutingKey().size() + 1 |
| + header->size() + 4;//4 extra bytes for size |
| } |
| |
| uint64_t BasicMessage::expectedContentSize() |
| { |
| return header.get() ? header->getContentSize() : 0; |
| } |
| |
| void BasicMessage::releaseContent(MessageStore* store) |
| { |
| Mutex::ScopedLock locker(contentLock); |
| if (!isPersistent() && getPersistenceId() == 0) { |
| store->stage(this); |
| } |
| if (!content.get() || content->size() > 0) { |
| // FIXME aconway 2007-02-07: handle MessageMessage. |
| //set content to lazy loading mode (but only if there is stored content): |
| |
| //Note: the LazyLoadedContent instance contains a raw pointer to the message, however it is |
| // then set as a member of that message so its lifetime is guaranteed to be no longer than |
| // that of the message itself |
| content = std::auto_ptr<Content>( |
| new LazyLoadedContent(store, this, expectedContentSize())); |
| } |
| } |
| |
| void BasicMessage::setContent(std::auto_ptr<Content>& _content) |
| { |
| Mutex::ScopedLock locker(contentLock); |
| content = _content; |
| } |