| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| #include <memory> |
| #include <boost/lexical_cast.hpp> |
| |
| #include <ConnectionRedirectBody.h> |
| #include <ProtocolVersion.h> |
| #include <amqp_framing.h> |
| #include <iostream> |
| #include <qpid_test_plugin.h> |
| #include <sstream> |
| #include <typeinfo> |
| #include <QpidError.h> |
| #include <AMQP_HighestVersion.h> |
| #include "AMQRequestBody.h" |
| #include "AMQResponseBody.h" |
| #include "Requester.h" |
| #include "Responder.h" |
| #include "InProcessBroker.h" |
| #include "client/Connection.h" |
| #include "client/ClientExchange.h" |
| #include "client/ClientQueue.h" |
| |
| using namespace qpid; |
| using namespace qpid::framing; |
| using namespace std; |
| |
| template <class T> |
| std::string tostring(const T& x) |
| { |
| std::ostringstream out; |
| out << x; |
| return out.str(); |
| } |
| |
| class FramingTest : public CppUnit::TestCase |
| { |
| CPPUNIT_TEST_SUITE(FramingTest); |
| CPPUNIT_TEST(testBasicQosBody); |
| CPPUNIT_TEST(testConnectionSecureBody); |
| CPPUNIT_TEST(testConnectionRedirectBody); |
| CPPUNIT_TEST(testAccessRequestBody); |
| CPPUNIT_TEST(testBasicConsumeBody); |
| CPPUNIT_TEST(testConnectionRedirectBodyFrame); |
| CPPUNIT_TEST(testBasicConsumeOkBodyFrame); |
| CPPUNIT_TEST(testRequestBodyFrame); |
| CPPUNIT_TEST(testResponseBodyFrame); |
| CPPUNIT_TEST(testRequester); |
| CPPUNIT_TEST(testResponder); |
| CPPUNIT_TEST(testInlineContent); |
| CPPUNIT_TEST(testContentReference); |
| CPPUNIT_TEST(testContentValidation); |
| CPPUNIT_TEST(testRequestResponseRoundtrip); |
| CPPUNIT_TEST_SUITE_END(); |
| |
| private: |
| Buffer buffer; |
| ProtocolVersion version; |
| AMQP_MethodVersionMap versionMap; |
| |
| public: |
| |
| FramingTest() : buffer(1024), version(highestProtocolVersion) {} |
| |
| void testBasicQosBody() |
| { |
| BasicQosBody in(version, 0xCAFEBABE, 0xABBA, true); |
| in.encodeContent(buffer); |
| buffer.flip(); |
| BasicQosBody out(version); |
| out.decodeContent(buffer); |
| CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); |
| } |
| |
| void testConnectionSecureBody() |
| { |
| std::string s = "security credential"; |
| ConnectionSecureBody in(version, s); |
| in.encodeContent(buffer); |
| buffer.flip(); |
| ConnectionSecureBody out(version); |
| out.decodeContent(buffer); |
| CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); |
| } |
| |
| void testConnectionRedirectBody() |
| { |
| std::string a = "hostA"; |
| std::string b = "hostB"; |
| ConnectionRedirectBody in(version, 0, a, b); |
| in.encodeContent(buffer); |
| buffer.flip(); |
| ConnectionRedirectBody out(version); |
| out.decodeContent(buffer); |
| CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); |
| } |
| |
| void testAccessRequestBody() |
| { |
| std::string s = "text"; |
| AccessRequestBody in(version, s, true, false, true, false, true); |
| in.encodeContent(buffer); |
| buffer.flip(); |
| AccessRequestBody out(version); |
| out.decodeContent(buffer); |
| CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); |
| } |
| |
| void testBasicConsumeBody() |
| { |
| std::string q = "queue"; |
| std::string t = "tag"; |
| BasicConsumeBody in(version, 0, q, t, false, true, false, false, |
| FieldTable()); |
| in.encodeContent(buffer); |
| buffer.flip(); |
| BasicConsumeBody out(version); |
| out.decodeContent(buffer); |
| CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); |
| } |
| |
| |
| void testConnectionRedirectBodyFrame() |
| { |
| std::string a = "hostA"; |
| std::string b = "hostB"; |
| AMQFrame in(version, 999, |
| new ConnectionRedirectBody(version, 0, a, b)); |
| in.encode(buffer); |
| buffer.flip(); |
| AMQFrame out; |
| out.decode(buffer); |
| CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); |
| } |
| |
| void testBasicConsumeOkBodyFrame() |
| { |
| std::string s = "hostA"; |
| AMQFrame in(version, 999, new BasicConsumeOkBody(version, 0, s)); |
| in.encode(buffer); |
| buffer.flip(); |
| AMQFrame out; |
| for(int i = 0; i < 5; i++){ |
| out.decode(buffer); |
| CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); |
| } |
| } |
| |
| void testRequestBodyFrame() { |
| std::string testing("testing"); |
| AMQBody::shared_ptr request(new ChannelOpenBody(version, testing)); |
| AMQFrame in(version, 999, request); |
| in.encode(buffer); |
| buffer.flip(); |
| AMQFrame out; |
| out.decode(buffer); |
| ChannelOpenBody* decoded = |
| dynamic_cast<ChannelOpenBody*>(out.getBody().get()); |
| CPPUNIT_ASSERT(decoded); |
| CPPUNIT_ASSERT_EQUAL(testing, decoded->getOutOfBand()); |
| } |
| |
| void testResponseBodyFrame() { |
| AMQBody::shared_ptr response(new ChannelOkBody(version)); |
| AMQFrame in(version, 999, response); |
| in.encode(buffer); |
| buffer.flip(); |
| AMQFrame out; |
| out.decode(buffer); |
| ChannelOkBody* decoded = |
| dynamic_cast<ChannelOkBody*>(out.getBody().get()); |
| CPPUNIT_ASSERT(decoded); |
| } |
| |
| void testInlineContent() { |
| Content content(INLINE, "MyData"); |
| CPPUNIT_ASSERT(content.isInline()); |
| content.encode(buffer); |
| buffer.flip(); |
| Content recovered; |
| recovered.decode(buffer); |
| CPPUNIT_ASSERT(recovered.isInline()); |
| CPPUNIT_ASSERT_EQUAL(content.getValue(), recovered.getValue()); |
| } |
| |
| void testContentReference() { |
| Content content(REFERENCE, "MyRef"); |
| CPPUNIT_ASSERT(content.isReference()); |
| content.encode(buffer); |
| buffer.flip(); |
| Content recovered; |
| recovered.decode(buffer); |
| CPPUNIT_ASSERT(recovered.isReference()); |
| CPPUNIT_ASSERT_EQUAL(content.getValue(), recovered.getValue()); |
| } |
| |
| void testContentValidation() { |
| try { |
| Content content(REFERENCE, ""); |
| CPPUNIT_ASSERT(false);//fail, expected exception |
| } catch (QpidError& e) { |
| CPPUNIT_ASSERT_EQUAL(FRAMING_ERROR, e.code); |
| CPPUNIT_ASSERT_EQUAL(string("Reference cannot be empty"), e.msg); |
| } |
| |
| try { |
| Content content(2, "Blah"); |
| CPPUNIT_ASSERT(false);//fail, expected exception |
| } catch (QpidError& e) { |
| CPPUNIT_ASSERT_EQUAL(FRAMING_ERROR, e.code); |
| CPPUNIT_ASSERT_EQUAL(string("Invalid discriminator: 2"), e.msg); |
| } |
| |
| try { |
| buffer.putOctet(2); |
| buffer.putLongString("blah, blah"); |
| buffer.flip(); |
| Content content; |
| content.decode(buffer); |
| CPPUNIT_ASSERT(false);//fail, expected exception |
| } catch (QpidError& e) { |
| CPPUNIT_ASSERT_EQUAL(FRAMING_ERROR, e.code); |
| CPPUNIT_ASSERT_EQUAL(string("Invalid discriminator: 2"), e.msg); |
| } |
| |
| } |
| |
| void testRequester() { |
| Requester r; |
| AMQRequestBody::Data q; |
| AMQResponseBody::Data p; |
| |
| r.sending(q); |
| CPPUNIT_ASSERT_EQUAL(1ULL, q.requestId); |
| CPPUNIT_ASSERT_EQUAL(0ULL, q.responseMark); |
| |
| r.sending(q); |
| CPPUNIT_ASSERT_EQUAL(2ULL, q.requestId); |
| CPPUNIT_ASSERT_EQUAL(0ULL, q.responseMark); |
| |
| // Now process a response |
| p.responseId = 1; |
| p.requestId = 2; |
| r.processed(AMQResponseBody::Data(1, 2)); |
| |
| r.sending(q); |
| CPPUNIT_ASSERT_EQUAL(3ULL, q.requestId); |
| CPPUNIT_ASSERT_EQUAL(1ULL, q.responseMark); |
| |
| try { |
| r.processed(p); // Already processed this response. |
| CPPUNIT_FAIL("Expected exception"); |
| } catch (...) {} |
| |
| try { |
| p.requestId = 50; |
| r.processed(p); // No such request |
| CPPUNIT_FAIL("Expected exception"); |
| } catch (...) {} |
| |
| r.sending(q); // reqId=4 |
| r.sending(q); // reqId=5 |
| r.sending(q); // reqId=6 |
| p.responseId++; |
| p.requestId = 4; |
| p.batchOffset = 2; |
| r.processed(p); |
| r.sending(q); |
| CPPUNIT_ASSERT_EQUAL(7ULL, q.requestId); |
| CPPUNIT_ASSERT_EQUAL(2ULL, q.responseMark); |
| |
| p.responseId++; |
| p.requestId = 1; // Out of order |
| p.batchOffset = 0; |
| r.processed(p); |
| r.sending(q); |
| CPPUNIT_ASSERT_EQUAL(8ULL, q.requestId); |
| CPPUNIT_ASSERT_EQUAL(3ULL, q.responseMark); |
| } |
| |
| void testResponder() { |
| Responder r; |
| AMQRequestBody::Data q; |
| AMQResponseBody::Data p; |
| |
| q.requestId = 1; |
| q.responseMark = 0; |
| r.received(q); |
| p.requestId = q.requestId; |
| r.sending(p); |
| CPPUNIT_ASSERT_EQUAL(1ULL, p.responseId); |
| CPPUNIT_ASSERT_EQUAL(1ULL, p.requestId); |
| CPPUNIT_ASSERT_EQUAL(0U, p.batchOffset); |
| CPPUNIT_ASSERT_EQUAL(0ULL, r.getResponseMark()); |
| |
| q.requestId++; |
| q.responseMark = 1; |
| r.received(q); |
| r.sending(p); |
| CPPUNIT_ASSERT_EQUAL(2ULL, p.responseId); |
| CPPUNIT_ASSERT_EQUAL(0U, p.batchOffset); |
| CPPUNIT_ASSERT_EQUAL(1ULL, r.getResponseMark()); |
| |
| try { |
| // Response mark higher any request ID sent. |
| q.responseMark = 3; |
| r.received(q); |
| } catch(...) {} |
| |
| try { |
| // Response mark lower than previous response mark. |
| q.responseMark = 0; |
| r.received(q); |
| } catch(...) {} |
| |
| // TODO aconway 2007-01-14: Test for batching when supported. |
| |
| } |
| |
| // expect may contain null chars so use string(ptr,size) constructor |
| // Use sizeof(expect)-1 to strip the trailing null. |
| #define ASSERT_FRAME(expect, frame) \ |
| CPPUNIT_ASSERT_EQUAL(string(expect, sizeof(expect)-1), boost::lexical_cast<string>(frame)) |
| |
| void testRequestResponseRoundtrip() { |
| broker::InProcessBroker ibroker(version); |
| client::Connection clientConnection; |
| clientConnection.setConnector(ibroker); |
| clientConnection.open(""); |
| client::Channel c; |
| clientConnection.openChannel(c); |
| |
| client::Exchange exchange( |
| "MyExchange", client::Exchange::TOPIC_EXCHANGE); |
| client::Queue queue("MyQueue", true); |
| c.declareExchange(exchange); |
| c.declareQueue(queue); |
| c.bind(exchange, queue, "MyTopic", framing::FieldTable()); |
| broker::InProcessBroker::Conversation::const_iterator i = ibroker.conversation.begin(); |
| ASSERT_FRAME("BROKER: Frame[channel=0; request(id=1,mark=0): ConnectionStart: versionMajor=0; versionMinor=9; serverProperties={}; mechanisms=PLAIN; locales=en_US]", *i++); |
| ASSERT_FRAME("CLIENT: Frame[channel=0; response(id=1,request=1,batch=0): ConnectionStartOk: clientProperties={}; mechanism=PLAIN; response=\000guest\000guest; locale=en_US]", *i++); |
| ASSERT_FRAME("BROKER: Frame[channel=0; request(id=2,mark=1): ConnectionTune: channelMax=100; frameMax=65536; heartbeat=0]", *i++); |
| ASSERT_FRAME("CLIENT: Frame[channel=0; response(id=2,request=2,batch=0): ConnectionTuneOk: channelMax=100; frameMax=65536; heartbeat=0]", *i++); |
| ASSERT_FRAME("CLIENT: Frame[channel=0; request(id=1,mark=0): ConnectionOpen: virtualHost=/; capabilities=; insist=1]", *i++); |
| ASSERT_FRAME("BROKER: Frame[channel=0; response(id=1,request=1,batch=0): ConnectionOpenOk: knownHosts=]", *i++); |
| ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=1,mark=0): ChannelOpen: outOfBand=]", *i++); |
| ASSERT_FRAME("BROKER: Frame[channel=1; response(id=1,request=1,batch=0): ChannelOpenOk: channelId=]", *i++); |
| ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=2,mark=1): ExchangeDeclare: ticket=0; exchange=MyExchange; type=topic; passive=0; durable=0; autoDelete=0; internal=0; nowait=0; arguments={}]", *i++); |
| ASSERT_FRAME("BROKER: Frame[channel=1; response(id=2,request=2,batch=0): ExchangeDeclareOk: ]", *i++); |
| ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=3,mark=2): QueueDeclare: ticket=0; queue=MyQueue; passive=0; durable=0; exclusive=1; autoDelete=1; nowait=0; arguments={}]", *i++); |
| ASSERT_FRAME("BROKER: Frame[channel=1; response(id=3,request=3,batch=0): QueueDeclareOk: queue=MyQueue; messageCount=0; consumerCount=0]", *i++); |
| ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=4,mark=3): QueueBind: ticket=0; queue=MyQueue; exchange=MyExchange; routingKey=MyTopic; nowait=0; arguments={}]", *i++); |
| ASSERT_FRAME("BROKER: Frame[channel=1; response(id=4,request=4,batch=0): QueueBindOk: ]", *i++); |
| } |
| }; |
| |
| |
| // Make this test suite a plugin. |
| CPPUNIT_PLUGIN_IMPLEMENT(); |
| CPPUNIT_TEST_SUITE_REGISTRATION(FramingTest); |
| |
| |
| |