blob: 5673a2c42a3f1c2db1433fe68c282601b7b347a5 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include <assert.h>
#include <iostream>
#include <sstream>
#include <algorithm>
#include <functional>
#include <boost/bind.hpp>
#include "BrokerChannel.h"
#include "DeletingTxOp.h"
#include "framing/ChannelAdapter.h"
#include <QpidError.h>
#include <DeliverableMessage.h>
#include <BrokerQueue.h>
#include <BrokerMessage.h>
#include <MessageStore.h>
#include <TxAck.h>
#include <TxPublish.h>
#include "BrokerAdapter.h"
#include "Connection.h"
using std::mem_fun_ref;
using std::bind2nd;
using namespace qpid::broker;
using namespace qpid::framing;
using namespace qpid::sys;
Channel::Channel(
Connection& con, ChannelId id,
uint32_t _framesize, MessageStore* const _store,
uint64_t _stagingThreshold
) :
ChannelAdapter(id, &con.getOutput(), con.getVersion()),
connection(con),
currentDeliveryTag(1),
transactional(false),
prefetchSize(0),
prefetchCount(0),
framesize(_framesize),
tagGenerator("sgen"),
accumulatedAck(0),
store(_store),
messageBuilder(this, _store, _stagingThreshold),
opened(id == 0),//channel 0 is automatically open, other must be explicitly opened
adapter(new BrokerAdapter(*this, con, con.broker))
{
outstanding.reset();
}
Channel::~Channel(){
close();
}
bool Channel::exists(const string& consumerTag){
return consumers.find(consumerTag) != consumers.end();
}
// TODO aconway 2007-02-12: Why is connection token passed in instead
// of using the channel's parent connection?
void Channel::consume(string& tagInOut, Queue::shared_ptr queue, bool acks,
bool exclusive, ConnectionToken* const connection,
const FieldTable*)
{
if(tagInOut.empty())
tagInOut = tagGenerator.generate();
std::auto_ptr<ConsumerImpl> c(
new ConsumerImpl(this, tagInOut, queue, connection, acks));
queue->consume(c.get(), exclusive);//may throw exception
consumers.insert(tagInOut, c.release());
}
void Channel::cancel(const string& tag){
// consumers is a ptr_map so erase will delete the consumer
// which will call cancel.
ConsumerImplMap::iterator i = consumers.find(tag);
if (i != consumers.end())
consumers.erase(i);
}
void Channel::close(){
opened = false;
consumers.clear();
recover(true);
}
void Channel::begin(){
transactional = true;
}
void Channel::commit(){
TxAck txAck(accumulatedAck, unacked);
txBuffer.enlist(&txAck);
if(txBuffer.prepare(store)){
txBuffer.commit();
}
accumulatedAck.clear();
}
void Channel::rollback(){
txBuffer.rollback();
accumulatedAck.clear();
}
void Channel::deliver(
Message::shared_ptr& msg, const string& consumerTag,
Queue::shared_ptr& queue, bool ackExpected)
{
Mutex::ScopedLock locker(deliveryLock);
// Key the delivered messages to the id of the request in which they're sent
uint64_t deliveryTag = getNextSendRequestId();
if(ackExpected){
unacked.push_back(DeliveryRecord(msg, queue, consumerTag, deliveryTag));
outstanding.size += msg->contentSize();
outstanding.count++;
}
//send deliver method, header and content(s)
msg->deliver(*this, consumerTag, deliveryTag, framesize);
}
bool Channel::checkPrefetch(Message::shared_ptr& msg){
Mutex::ScopedLock locker(deliveryLock);
bool countOk = !prefetchCount || prefetchCount > unacked.size();
bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstanding.size || unacked.empty();
return countOk && sizeOk;
}
Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, const string& _tag,
Queue::shared_ptr _queue,
ConnectionToken* const _connection, bool ack
) : parent(_parent), tag(_tag), queue(_queue), connection(_connection),
ackExpected(ack), blocked(false) {}
bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){
if(!connection || connection != msg->getPublisher()){//check for no_local
if(ackExpected && !parent->checkPrefetch(msg)){
blocked = true;
}else{
blocked = false;
parent->deliver(msg, tag, queue, ackExpected);
return true;
}
}
return false;
}
Channel::ConsumerImpl::~ConsumerImpl() {
cancel();
}
void Channel::ConsumerImpl::cancel(){
if(queue)
queue->cancel(this);
}
void Channel::ConsumerImpl::requestDispatch(){
if(blocked)
queue->dispatch();
}
void Channel::handleInlineTransfer(Message::shared_ptr msg)
{
Exchange::shared_ptr exchange =
connection.broker.getExchanges().get(msg->getExchange());
if(transactional){
TxPublish* deliverable = new TxPublish(msg);
exchange->route(
*deliverable, msg->getRoutingKey(),
&(msg->getApplicationHeaders()));
txBuffer.enlist(new DeletingTxOp(deliverable));
}else{
DeliverableMessage deliverable(msg);
exchange->route(
deliverable, msg->getRoutingKey(),
&(msg->getApplicationHeaders()));
}
}
void Channel::handlePublish(Message* _message){
Message::shared_ptr message(_message);
messageBuilder.initialise(message);
}
void Channel::handleHeader(AMQHeaderBody::shared_ptr header){
messageBuilder.setHeader(header);
//at this point, decide based on the size of the message whether we want
//to stage it by saving content directly to disk as it arrives
}
void Channel::handleContent(AMQContentBody::shared_ptr content){
messageBuilder.addContent(content);
}
void Channel::handleHeartbeat(boost::shared_ptr<AMQHeartbeatBody>) {
// TODO aconway 2007-01-17: Implement heartbeating.
}
void Channel::complete(Message::shared_ptr msg) {
Exchange::shared_ptr exchange =
connection.broker.getExchanges().get(msg->getExchange());
assert(exchange.get());
if(transactional) {
std::auto_ptr<TxPublish> deliverable(new TxPublish(msg));
exchange->route(*deliverable, msg->getRoutingKey(),
&(msg->getApplicationHeaders()));
txBuffer.enlist(new DeletingTxOp(deliverable.release()));
} else {
DeliverableMessage deliverable(msg);
exchange->route(deliverable, msg->getRoutingKey(),
&(msg->getApplicationHeaders()));
}
}
void Channel::ack(){
ack(getFirstAckRequest(), getLastAckRequest());
}
// Used by Basic
void Channel::ack(uint64_t deliveryTag, bool multiple){
if (multiple)
ack(0, deliveryTag);
else
ack(deliveryTag, deliveryTag);
}
void Channel::ack(uint64_t firstTag, uint64_t lastTag){
if(transactional){
accumulatedAck.update(firstTag, lastTag);
//TODO: I think the outstanding prefetch size & count should be updated at this point...
//TODO: ...this may then necessitate dispatching to consumers
}else{
Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
ack_iterator i = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), lastTag));
ack_iterator j = (firstTag == 0) ?
unacked.begin() :
find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), firstTag));
if(i == unacked.end()){
throw ConnectionException(530, "Received ack for unrecognised delivery tag");
}else if(i!=j){
ack_iterator end = ++i;
for_each(j, end, mem_fun_ref(&DeliveryRecord::discard));
unacked.erase(unacked.begin(), end);
//recalculate the prefetch:
outstanding.reset();
for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::addTo), &outstanding));
}else{
i->discard();
i->subtractFrom(&outstanding);
unacked.erase(i);
}
//if the prefetch limit had previously been reached, there may
//be messages that can be now be delivered
std::for_each(consumers.begin(), consumers.end(),
boost::bind(&ConsumerImpl::requestDispatch, _1));
}
}
void Channel::recover(bool requeue){
Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
if(requeue){
outstanding.reset();
std::list<DeliveryRecord> copy = unacked;
unacked.clear();
for_each(copy.begin(), copy.end(), mem_fun_ref(&DeliveryRecord::requeue));
}else{
for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::redeliver), this));
}
}
bool Channel::get(Queue::shared_ptr queue, const string& destination, bool ackExpected){
Message::shared_ptr msg = queue->dequeue();
if(msg){
Mutex::ScopedLock locker(deliveryLock);
uint64_t myDeliveryTag = getNextSendRequestId();
msg->sendGetOk(MethodContext(this, msg->getRespondTo()),
destination,
queue->getMessageCount() + 1, myDeliveryTag,
framesize);
if(ackExpected){
unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag));
}
return true;
}else{
return false;
}
}
void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag,
uint64_t deliveryTag)
{
msg->deliver(*this, consumerTag, deliveryTag, framesize);
}
void Channel::handleMethodInContext(
boost::shared_ptr<qpid::framing::AMQMethodBody> method,
const MethodContext& context
)
{
try{
if(getId() != 0 && !method->isA<ChannelOpenBody>() && !isOpen()) {
std::stringstream out;
out << "Attempt to use unopened channel: " << getId();
throw ConnectionException(504, out.str());
} else {
method->invoke(*adapter, context);
}
}catch(ChannelException& e){
adapter->getProxy().getChannel().close(
e.code, e.toString(),
method->amqpClassId(), method->amqpMethodId());
connection.closeChannel(getId());
}catch(ConnectionException& e){
connection.close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId());
}catch(std::exception& e){
connection.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId());
}
}