blob: 88e8318832e317a3228dce255bff9754e43077fd [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include <Exception.h>
#include <BrokerMessage.h>
#include <MessageBuilder.h>
#include <NullMessageStore.h>
#include <Buffer.h>
#include <qpid_test_plugin.h>
#include <iostream>
#include <memory>
using namespace boost;
using namespace qpid::broker;
using namespace qpid::framing;
using namespace qpid::sys;
class MessageBuilderTest : public CppUnit::TestCase
{
struct DummyHandler : MessageBuilder::CompletionHandler{
Message::shared_ptr msg;
virtual void complete(Message::shared_ptr& _msg){
msg = _msg;
}
};
class TestMessageStore : public NullMessageStore
{
Buffer* header;
Buffer* content;
const u_int32_t contentBufferSize;
public:
void stage(Message* const msg)
{
if (msg->getPersistenceId() == 0) {
header = new Buffer(msg->encodedHeaderSize());
msg->encodeHeader(*header);
content = new Buffer(contentBufferSize);
msg->setPersistenceId(1);
} else {
throw qpid::Exception("Message already staged!");
}
}
void appendContent(Message* msg, const string& data)
{
if (msg) {
content->putRawData(data);
} else {
throw qpid::Exception("Invalid message id!");
}
}
void destroy(Message* msg)
{
CPPUNIT_ASSERT(msg->getPersistenceId());
}
Message::shared_ptr getRestoredMessage()
{
Message::shared_ptr msg(new Message());
if (header) {
header->flip();
msg->decodeHeader(*header);
delete header;
header = 0;
if (content) {
content->flip();
msg->decodeContent(*content);
delete content;
content = 0;
}
}
return msg;
}
//dont care about any of the other methods:
TestMessageStore(u_int32_t _contentBufferSize) : NullMessageStore(false), header(0), content(0),
contentBufferSize(_contentBufferSize) {}
~TestMessageStore(){}
};
CPPUNIT_TEST_SUITE(MessageBuilderTest);
CPPUNIT_TEST(testHeaderOnly);
CPPUNIT_TEST(test1ContentFrame);
CPPUNIT_TEST(test2ContentFrames);
CPPUNIT_TEST(testStaging);
CPPUNIT_TEST_SUITE_END();
public:
void testHeaderOnly(){
DummyHandler handler;
MessageBuilder builder(&handler);
Message::shared_ptr message(new Message(0, "test", "my_routing_key", false, false));
AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
header->setContentSize(0);
builder.initialise(message);
CPPUNIT_ASSERT(!handler.msg);
builder.setHeader(header);
CPPUNIT_ASSERT(handler.msg);
CPPUNIT_ASSERT_EQUAL(message, handler.msg);
}
void test1ContentFrame(){
DummyHandler handler;
MessageBuilder builder(&handler);
string data1("abcdefg");
Message::shared_ptr message(new Message(0, "test", "my_routing_key", false, false));
AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
header->setContentSize(7);
AMQContentBody::shared_ptr part1(new AMQContentBody(data1));
builder.initialise(message);
CPPUNIT_ASSERT(!handler.msg);
builder.setHeader(header);
CPPUNIT_ASSERT(!handler.msg);
builder.addContent(part1);
CPPUNIT_ASSERT(handler.msg);
CPPUNIT_ASSERT_EQUAL(message, handler.msg);
}
void test2ContentFrames(){
DummyHandler handler;
MessageBuilder builder(&handler);
string data1("abcdefg");
string data2("hijklmn");
Message::shared_ptr message(new Message(0, "test", "my_routing_key", false, false));
AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
header->setContentSize(14);
AMQContentBody::shared_ptr part1(new AMQContentBody(data1));
AMQContentBody::shared_ptr part2(new AMQContentBody(data2));
builder.initialise(message);
CPPUNIT_ASSERT(!handler.msg);
builder.setHeader(header);
CPPUNIT_ASSERT(!handler.msg);
builder.addContent(part1);
CPPUNIT_ASSERT(!handler.msg);
builder.addContent(part2);
CPPUNIT_ASSERT(handler.msg);
CPPUNIT_ASSERT_EQUAL(message, handler.msg);
}
void testStaging(){
//store must be the last thing to be destroyed or destructor
//of Message fails (it uses the store to call destroy if lazy
//loaded content is in use)
TestMessageStore store(14);
{
DummyHandler handler;
MessageBuilder builder(&handler, &store, 5);
string data1("abcdefg");
string data2("hijklmn");
Message::shared_ptr message(new Message(0, "test", "my_routing_key", false, false));
AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
header->setContentSize(14);
BasicHeaderProperties* properties = dynamic_cast<BasicHeaderProperties*>(header->getProperties());
properties->setMessageId("MyMessage");
properties->getHeaders().setString("abc", "xyz");
AMQContentBody::shared_ptr part1(new AMQContentBody(data1));
AMQContentBody::shared_ptr part2(new AMQContentBody(data2));
builder.initialise(message);
builder.setHeader(header);
builder.addContent(part1);
builder.addContent(part2);
CPPUNIT_ASSERT(handler.msg);
CPPUNIT_ASSERT_EQUAL(message, handler.msg);
Message::shared_ptr restored = store.getRestoredMessage();
CPPUNIT_ASSERT_EQUAL(message->getExchange(), restored->getExchange());
CPPUNIT_ASSERT_EQUAL(message->getRoutingKey(), restored->getRoutingKey());
CPPUNIT_ASSERT_EQUAL(message->getHeaderProperties()->getMessageId(), restored->getHeaderProperties()->getMessageId());
CPPUNIT_ASSERT_EQUAL(message->getHeaderProperties()->getHeaders().getString("abc"),
restored->getHeaderProperties()->getHeaders().getString("abc"));
CPPUNIT_ASSERT_EQUAL((u_int64_t) 14, restored->contentSize());
}
}
};
// Make this test suite a plugin.
CPPUNIT_PLUGIN_IMPLEMENT();
CPPUNIT_TEST_SUITE_REGISTRATION(MessageBuilderTest);