| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| #include "QpidError.h" |
| #include "BrokerMessageMessage.h" |
| #include "ChannelAdapter.h" |
| #include "MessageTransferBody.h" |
| #include "MessageOpenBody.h" |
| #include "MessageCloseBody.h" |
| #include "MessageAppendBody.h" |
| #include "Reference.h" |
| #include "framing/FieldTable.h" |
| #include "framing/BasicHeaderProperties.h" |
| |
| #include <algorithm> |
| |
| using namespace std; |
| using namespace qpid::framing; |
| |
| namespace qpid { |
| namespace broker { |
| |
| MessageMessage::MessageMessage( |
| ConnectionToken* publisher, RequestId requestId_, TransferPtr transfer_ |
| ) : Message(publisher, transfer_->getDestination(), |
| transfer_->getRoutingKey(), |
| transfer_->getMandatory(), |
| transfer_->getImmediate(), |
| transfer_), |
| requestId(requestId_), |
| transfer(transfer_) |
| {} |
| |
| MessageMessage::MessageMessage( |
| ConnectionToken* publisher, RequestId requestId_, TransferPtr transfer_, |
| ReferencePtr reference_ |
| ) : Message(publisher, transfer_->getDestination(), |
| transfer_->getRoutingKey(), |
| transfer_->getMandatory(), |
| transfer_->getImmediate(), |
| transfer_), |
| requestId(requestId_), |
| transfer(transfer_), |
| reference(reference_) |
| {} |
| |
| // TODO: astitcher 1-Mar-2007: This code desperately needs better factoring |
| void MessageMessage::transferMessage( |
| framing::ChannelAdapter& channel, |
| const std::string& consumerTag, |
| uint32_t framesize) |
| { |
| const framing::Content& body = transfer->getBody(); |
| |
| // Send any reference data |
| if (!body.isInline()){ |
| // Open |
| channel.send(new MessageOpenBody(channel.getVersion(), reference->getId())); |
| // Appends |
| for(Reference::Appends::const_iterator a = reference->getAppends().begin(); |
| a != reference->getAppends().end(); |
| ++a) { |
| uint32_t sizeleft = (*a)->size(); |
| const string& content = (*a)->getBytes(); |
| // Calculate overhead bytes |
| // Assume that the overhead is constant as the reference name doesn't change |
| uint32_t overhead = sizeleft - content.size(); |
| string::size_type contentStart = 0; |
| while (sizeleft) { |
| string::size_type contentSize = sizeleft <= framesize ? sizeleft : framesize-overhead; |
| channel.send(new MessageAppendBody(channel.getVersion(), reference->getId(), |
| string(content, contentStart, contentSize))); |
| sizeleft -= contentSize; |
| contentStart += contentSize; |
| } |
| } |
| } |
| |
| // The transfer |
| if ( transfer->size()<=framesize ) { |
| channel.send( |
| new MessageTransferBody(channel.getVersion(), |
| transfer->getTicket(), |
| consumerTag, |
| getRedelivered(), |
| transfer->getImmediate(), |
| transfer->getTtl(), |
| transfer->getPriority(), |
| transfer->getTimestamp(), |
| transfer->getDeliveryMode(), |
| transfer->getExpiration(), |
| getExchange(), |
| getRoutingKey(), |
| transfer->getMessageId(), |
| transfer->getCorrelationId(), |
| transfer->getReplyTo(), |
| transfer->getContentType(), |
| transfer->getContentEncoding(), |
| transfer->getUserId(), |
| transfer->getAppId(), |
| transfer->getTransactionId(), |
| transfer->getSecurityToken(), |
| transfer->getApplicationHeaders(), |
| body, |
| transfer->getMandatory())); |
| } else { |
| // Thing to do here is to construct a simple reference message then deliver that instead |
| // fragmentation will be taken care of in the delivery if necessary; |
| string content = body.getValue(); |
| string refname = "dummy"; |
| TransferPtr newTransfer( |
| new MessageTransferBody(channel.getVersion(), |
| transfer->getTicket(), |
| consumerTag, |
| getRedelivered(), |
| transfer->getImmediate(), |
| transfer->getTtl(), |
| transfer->getPriority(), |
| transfer->getTimestamp(), |
| transfer->getDeliveryMode(), |
| transfer->getExpiration(), |
| getExchange(), |
| getRoutingKey(), |
| transfer->getMessageId(), |
| transfer->getCorrelationId(), |
| transfer->getReplyTo(), |
| transfer->getContentType(), |
| transfer->getContentEncoding(), |
| transfer->getUserId(), |
| transfer->getAppId(), |
| transfer->getTransactionId(), |
| transfer->getSecurityToken(), |
| transfer->getApplicationHeaders(), |
| framing::Content(REFERENCE, refname), |
| transfer->getMandatory())); |
| ReferencePtr newRef(new Reference(refname)); |
| Reference::AppendPtr newAppend(new MessageAppendBody(channel.getVersion(), refname, content)); |
| newRef->append(newAppend); |
| MessageMessage newMsg(const_cast<ConnectionToken*>(getPublisher()), 0, newTransfer, newRef); |
| newMsg.transferMessage(channel, consumerTag, framesize); |
| return; |
| } |
| // Close any reference data |
| if (!body.isInline()){ |
| // Close |
| channel.send(new MessageCloseBody(channel.getVersion(), reference->getId())); |
| } |
| } |
| |
| void MessageMessage::deliver( |
| framing::ChannelAdapter& channel, |
| const std::string& consumerTag, |
| uint64_t /*deliveryTag*/, |
| uint32_t framesize) |
| { |
| transferMessage(channel, consumerTag, framesize); |
| } |
| |
| void MessageMessage::sendGetOk( |
| const framing::MethodContext& context, |
| const std::string& destination, |
| uint32_t /*messageCount*/, |
| uint64_t /*deliveryTag*/, |
| uint32_t framesize) |
| { |
| framing::ChannelAdapter* channel = context.channel; |
| transferMessage(*channel, destination, framesize); |
| } |
| |
| bool MessageMessage::isComplete() |
| { |
| return true; |
| } |
| |
| uint64_t MessageMessage::contentSize() const |
| { |
| if (transfer->getBody().isInline()) |
| return transfer->getBody().getValue().size(); |
| else |
| return reference->getSize(); |
| } |
| |
| qpid::framing::BasicHeaderProperties* MessageMessage::getHeaderProperties() |
| { |
| return 0; // FIXME aconway 2007-02-05: |
| } |
| |
| const FieldTable& MessageMessage::getApplicationHeaders() |
| { |
| return transfer->getApplicationHeaders(); |
| } |
| bool MessageMessage::isPersistent() |
| { |
| return transfer->getDeliveryMode() == PERSISTENT; |
| } |
| |
| uint32_t MessageMessage::encodedSize() |
| { |
| THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished"); |
| return 0; // FIXME aconway 2007-02-05: |
| } |
| |
| uint32_t MessageMessage::encodedHeaderSize() |
| { |
| THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished"); |
| return 0; // FIXME aconway 2007-02-05: |
| } |
| |
| uint32_t MessageMessage::encodedContentSize() |
| { |
| THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished"); |
| return 0; // FIXME aconway 2007-02-05: |
| } |
| |
| uint64_t MessageMessage::expectedContentSize() |
| { |
| THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished"); |
| return 0; // FIXME aconway 2007-02-05: |
| } |
| |
| |
| }} // namespace qpid::broker |