blob: 4a1cf249a89b4ebd8398d487402d0eade10cb297 [file] [log] [blame]
/*
*
* Copyright (c) 2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <iostream>
#include "Basic.h"
#include "AMQMethodBody.h"
#include "ClientChannel.h"
#include "ReturnedMessageHandler.h"
#include "MessageListener.h"
#include "framing/FieldTable.h"
#include "Connection.h"
using namespace std;
namespace qpid {
namespace client {
using namespace sys;
using namespace framing;
Basic::Basic(Channel& ch) : channel(ch), returnsHandler(0) {}
void Basic::consume(
Queue& queue, std::string& tag, MessageListener* listener,
AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields)
{
channel.sendAndReceiveSync<BasicConsumeOkBody>(
synch,
new BasicConsumeBody(
channel.version, 0, queue.getName(), tag, noLocal,
ackMode == NO_ACK, false, !synch,
fields ? *fields : FieldTable()));
if (synch) {
BasicConsumeOkBody::shared_ptr response =
boost::shared_polymorphic_downcast<BasicConsumeOkBody>(
channel.responses.getResponse());
tag = response->getConsumerTag();
}
// FIXME aconway 2007-02-20: Race condition!
// We could receive the first message for the consumer
// before we create the consumer below.
// Move consumer creation to handler for BasicConsumeOkBody
{
Mutex::ScopedLock l(lock);
ConsumerMap::iterator i = consumers.find(tag);
if (i != consumers.end())
THROW_QPID_ERROR(CLIENT_ERROR,
"Consumer already exists with tag="+tag);
Consumer& c = consumers[tag];
c.listener = listener;
c.ackMode = ackMode;
c.lastDeliveryTag = 0;
}
}
void Basic::cancel(const std::string& tag, bool synch) {
Consumer c;
{
Mutex::ScopedLock l(lock);
ConsumerMap::iterator i = consumers.find(tag);
if (i == consumers.end())
return;
c = i->second;
consumers.erase(i);
}
if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0)
channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true));
channel.sendAndReceiveSync<BasicCancelOkBody>(
synch, new BasicCancelBody(channel.version, tag, !synch));
}
void Basic::cancelAll(){
ConsumerMap consumersCopy;
{
Mutex::ScopedLock l(lock);
consumersCopy = consumers;
consumers.clear();
}
for (ConsumerMap::iterator i=consumersCopy.begin();
i != consumersCopy.end(); ++i)
{
Consumer& c = i->second;
if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK)
&& c.lastDeliveryTag > 0)
{
channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true));
}
}
}
bool Basic::get(Message& msg, const Queue& queue, AckMode ackMode) {
// Expect a message starting with a BasicGetOk
incoming.startGet();
channel.send(new BasicGetBody(channel.version, 0, queue.getName(), ackMode));
return incoming.waitGet(msg);
}
void Basic::publish(
const Message& msg, const Exchange& exchange,
const std::string& routingKey, bool mandatory, bool immediate)
{
const string e = exchange.getName();
string key = routingKey;
channel.send(new BasicPublishBody(channel.version, 0, e, key, mandatory, immediate));
//break msg up into header frame and content frame(s) and send these
channel.send(msg.getHeader());
string data = msg.getData();
uint64_t data_length = data.length();
if(data_length > 0){
//frame itself uses 8 bytes
uint32_t frag_size = channel.connection->getMaxFrameSize() - 8;
if(data_length < frag_size){
channel.send(new AMQContentBody(data));
}else{
uint32_t offset = 0;
uint32_t remaining = data_length - offset;
while (remaining > 0) {
uint32_t length = remaining > frag_size ? frag_size : remaining;
string frag(data.substr(offset, length));
channel.send(new AMQContentBody(frag));
offset += length;
remaining = data_length - offset;
}
}
}
}
void Basic::handle(boost::shared_ptr<AMQMethodBody> method) {
assert(method->amqpClassId() ==BasicGetBody::CLASS_ID);
switch(method->amqpMethodId()) {
case BasicDeliverBody::METHOD_ID:
case BasicReturnBody::METHOD_ID:
case BasicGetOkBody::METHOD_ID:
case BasicGetEmptyBody::METHOD_ID:
incoming.add(method);
return;
}
throw Channel::UnknownMethod();
}
void Basic::deliver(Consumer& consumer, Message& msg){
//record delivery tag:
consumer.lastDeliveryTag = msg.getDeliveryTag();
//allow registered listener to handle the message
consumer.listener->received(msg);
if(channel.isOpen()){
bool multiple(false);
switch(consumer.ackMode){
case LAZY_ACK:
multiple = true;
if(++(consumer.count) < channel.getPrefetch())
break;
//else drop-through
case AUTO_ACK:
consumer.lastDeliveryTag = 0;
channel.send(
new BasicAckBody(
channel.version, msg.getDeliveryTag(), multiple));
case NO_ACK: // Nothing to do
case CLIENT_ACK: // User code must ack.
break;
// TODO aconway 2007-02-22: Provide a way for user
// to ack!
}
}
//as it stands, transactionality is entirely orthogonal to ack
//mode, though the acks will not be processed by the broker under
//a transaction until it commits.
}
void Basic::run() {
while(channel.isOpen()) {
try {
Message msg = incoming.waitDispatch();
if(msg.getMethod()->isA<BasicReturnBody>()) {
ReturnedMessageHandler* handler=0;
{
Mutex::ScopedLock l(lock);
handler=returnsHandler;
}
if(handler == 0) {
// TODO aconway 2007-02-20: proper logging.
cout << "Message returned: " << msg.getData() << endl;
}
else
handler->returned(msg);
}
else {
BasicDeliverBody::shared_ptr deliverBody =
boost::shared_polymorphic_downcast<BasicDeliverBody>(
msg.getMethod());
std::string tag = deliverBody->getConsumerTag();
Consumer consumer;
{
Mutex::ScopedLock l(lock);
ConsumerMap::iterator i = consumers.find(tag);
if(i == consumers.end())
THROW_QPID_ERROR(PROTOCOL_ERROR+504,
"Unknown consumer tag=" + tag);
consumer = i->second;
}
deliver(consumer, msg);
}
}
catch (const ShutdownException&) {
/* Orderly shutdown */
}
catch (const Exception& e) {
// FIXME aconway 2007-02-20: Report exception to user.
cout << "client::Basic::run() terminated by: " << e.toString()
<< "(" << typeid(e).name() << ")" << endl;
}
}
}
void Basic::setReturnedMessageHandler(ReturnedMessageHandler* handler){
Mutex::ScopedLock l(lock);
returnsHandler = handler;
}
void Basic::setQos(){
channel.sendAndReceive<BasicQosOkBody>(
new BasicQosBody(channel.version, 0, channel.getPrefetch(), false));
if(channel.isTransactional())
channel.sendAndReceive<TxSelectOkBody>(new TxSelectBody(channel.version));
}
// TODO aconway 2007-02-22: NOTES:
// Move incoming to BasicChannel - check for uses.
}} // namespace qpid::client