blob: 07f94ceb642c7f82d700e50f1d938c46efb72b0d [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include <IncomingMessage.h>
#include "framing/AMQHeaderBody.h"
#include "framing/AMQContentBody.h"
#include "BasicGetOkBody.h"
#include "BasicReturnBody.h"
#include "BasicDeliverBody.h"
#include <QpidError.h>
#include <iostream>
namespace qpid {
namespace client {
using namespace sys;
using namespace framing;
struct IncomingMessage::Guard: public Mutex::ScopedLock {
Guard(IncomingMessage* im) : Mutex::ScopedLock(im->lock) {
im->shutdownError.throwIf();
}
};
IncomingMessage::IncomingMessage() { reset(); }
void IncomingMessage::reset() {
state = &IncomingMessage::expectRequest;
endFn= &IncomingMessage::endRequest;
buildMessage = Message();
}
void IncomingMessage::startGet() {
Guard g(this);
if (state != &IncomingMessage::expectRequest) {
endGet(new QPID_ERROR(CLIENT_ERROR, "Message already in progress."));
}
else {
state = &IncomingMessage::expectGetOk;
endFn = &IncomingMessage::endGet;
getError.reset();
getState = GETTING;
}
}
bool IncomingMessage::waitGet(Message& msg) {
Guard g(this);
while (getState == GETTING && !shutdownError && !getError)
getReady.wait(lock);
shutdownError.throwIf();
getError.throwIf();
msg = getMessage;
return getState==GOT;
}
Message IncomingMessage::waitDispatch() {
Guard g(this);
while(dispatchQueue.empty() && !shutdownError)
dispatchReady.wait(lock);
shutdownError.throwIf();
Message msg(dispatchQueue.front());
dispatchQueue.pop();
return msg;
}
void IncomingMessage::add(BodyPtr body) {
Guard g(this);
shutdownError.throwIf();
// Call the current state function.
(this->*state)(body);
}
void IncomingMessage::shutdown() {
Mutex::ScopedLock l(lock);
shutdownError.reset(new ShutdownException());
getReady.notify();
dispatchReady.notify();
}
bool IncomingMessage::isShutdown() const {
Mutex::ScopedLock l(lock);
return shutdownError;
}
// Common check for all the expect functions. Called in network thread.
template<class T>
boost::shared_ptr<T> IncomingMessage::expectCheck(BodyPtr body) {
boost::shared_ptr<T> ptr = boost::dynamic_pointer_cast<T>(body);
if (!ptr)
throw QPID_ERROR(PROTOCOL_ERROR+504, "Unexpected frame type");
return ptr;
}
void IncomingMessage::expectGetOk(BodyPtr body) {
if (dynamic_cast<BasicGetOkBody*>(body.get()))
state = &IncomingMessage::expectHeader;
else if (dynamic_cast<BasicGetEmptyBody*>(body.get())) {
getState = EMPTY;
endGet();
}
else
throw QPID_ERROR(PROTOCOL_ERROR+504, "Unexpected frame type");
}
void IncomingMessage::expectHeader(BodyPtr body) {
AMQHeaderBody::shared_ptr header = expectCheck<AMQHeaderBody>(body);
buildMessage.header = header;
state = &IncomingMessage::expectContent;
checkComplete();
}
void IncomingMessage::expectContent(BodyPtr body) {
AMQContentBody::shared_ptr content = expectCheck<AMQContentBody>(body);
buildMessage.setData(buildMessage.getData() + content->getData());
checkComplete();
}
void IncomingMessage::checkComplete() {
size_t declaredSize = buildMessage.header->getContentSize();
size_t currentSize = buildMessage.getData().size();
if (declaredSize == currentSize)
(this->*endFn)(0);
else if (declaredSize < currentSize)
(this->*endFn)(new QPID_ERROR(
PROTOCOL_ERROR, "Message content exceeds declared size."));
}
void IncomingMessage::expectRequest(BodyPtr body) {
AMQMethodBody::shared_ptr method = expectCheck<AMQMethodBody>(body);
buildMessage.setMethod(method);
state = &IncomingMessage::expectHeader;
}
void IncomingMessage::endGet(Exception* ex) {
getError.reset(ex);
if (getState == GETTING) {
getMessage = buildMessage;
getState = GOT;
}
reset();
getReady.notify();
}
void IncomingMessage::endRequest(Exception* ex) {
ExceptionHolder eh(ex);
if (!eh) {
dispatchQueue.push(buildMessage);
reset();
dispatchReady.notify();
}
eh.throwIf();
}
}} // namespace qpid::client