/*
 *
 * Copyright (c) 2006 The Apache Software Foundation
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

#include "unit_test.h"

#include "qpid/SessionState.h"
#include "qpid/Exception.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/SessionFlushBody.h"

#include <boost/bind.hpp>
#include <algorithm>
#include <functional>
#include <numeric>

namespace qpid {
namespace tests {

QPID_AUTO_TEST_SUITE(SessionStateTestSuite)

using namespace std;
using namespace qpid::framing;

// ================================================================
// Utility functions.

// Apply f to [begin, end) and accumulate the result
template <class Iter, class T, class F>
T applyAccumulate(Iter begin, Iter end, T seed, const F& f) {
    return std::accumulate(begin, end, seed, boost::bind(std::plus<T>(), _1, boost::bind(f, _2)));
}

// Create a frame with a one-char string.
AMQFrame& frame(char s) {
    static AMQFrame frame((AMQContentBody(string(&s, 1))));
    return frame;
}

// Simple string representation of a frame.
string str(const AMQFrame& f) {
    if (f.getMethod()) return "C"; // Command or Control
    const AMQContentBody* c = dynamic_cast<const AMQContentBody*>(f.getBody());
    if (c) return c->getData(); // Return data for content frames.
    return "H";                 // Must be a header.
}
// Make a string from a range of frames.
string str(const boost::iterator_range<vector<AMQFrame>::const_iterator>& frames) {
    string (*strFrame)(const AMQFrame&) = str;
    return applyAccumulate(frames.begin(), frames.end(), string(), ptr_fun(strFrame));
}
// Make a transfer command frame.
AMQFrame transferFrame(bool hasContent) {
    AMQFrame t((MessageTransferBody()));
    t.setFirstFrame(true);
    t.setLastFrame(true);
    t.setFirstSegment(true);
    t.setLastSegment(!hasContent);
    return t;
}
// Make a content frame
AMQFrame contentFrame(string content, bool isLast=true) {
    AMQFrame f((AMQContentBody(content)));
    f.setFirstFrame(true);
    f.setLastFrame(true);
    f.setFirstSegment(false);
    f.setLastSegment(isLast);
    return f;
}
AMQFrame contentFrameChar(char content, bool isLast=true) {
    return contentFrame(string(1, content), isLast);
}

// Send frame & return size of frame.
size_t send(qpid::SessionState& s, const AMQFrame& f) { s.senderRecord(f); return f.encodedSize(); }
// Send transfer command with no content.
size_t transfer0(qpid::SessionState& s) { return send(s, transferFrame(false)); }
// Send transfer frame with single content frame.
size_t transfer1(qpid::SessionState& s, string content) {
    return send(s,transferFrame(true)) + send(s,contentFrame(content));
}
size_t transfer1Char(qpid::SessionState& s, char content) {
    return transfer1(s, string(1,content));
}

// Send transfer frame with multiple single-byte content frames.
size_t transferN(qpid::SessionState& s, string content) {
    size_t size=send(s, transferFrame(!content.empty()));
    if (!content.empty()) {
        char last = content[content.size()-1];
        content.resize(content.size()-1);
        size += applyAccumulate(content.begin(), content.end(), 0,
                                boost::bind(&send, boost::ref(s),
                                     boost::bind(contentFrameChar, _1, false)));
        size += send(s, contentFrameChar(last, true));
    }
    return size;
}

// Send multiple transfers with single-byte content.
size_t transfers(qpid::SessionState& s, string content) {
    return applyAccumulate(content.begin(), content.end(), 0,
                           boost::bind(transfer1Char, boost::ref(s), _1));
}

size_t contentFrameSize(size_t n=1) { return AMQFrame(( AMQContentBody())).encodedSize() + n; }
size_t transferFrameSize() { return AMQFrame((MessageTransferBody())).encodedSize(); }

// ==== qpid::SessionState test classes

using qpid::SessionId;
using qpid::SessionPoint;


QPID_AUTO_TEST_CASE(testSendGetReplyList) {
    qpid::SessionState s;
    s.setTimeout(1);
    s.senderGetCommandPoint();
    transfer1(s, "abc");
    transfers(s, "def");
    transferN(s, "xyz");
    BOOST_CHECK_EQUAL(str(s.senderExpected(SessionPoint(0,0))),"CabcCdCeCfCxyz");
    // Ignore controls.
    s.senderRecord(AMQFrame(new SessionFlushBody()));
    BOOST_CHECK_EQUAL(str(s.senderExpected(SessionPoint(2,0))),"CeCfCxyz");
}

QPID_AUTO_TEST_CASE(testNeedFlush) {
    qpid::SessionState::Configuration c;
    // sync after 2 1-byte transfers or equivalent bytes.
    c.replayFlushLimit = 2*(transferFrameSize()+contentFrameSize());
    qpid::SessionState s(SessionId(), c);
    s.setTimeout(1);
    s.senderGetCommandPoint();
    transfers(s, "a");
    BOOST_CHECK(!s.senderNeedFlush());
    transfers(s, "b");
    BOOST_CHECK(s.senderNeedFlush());
    s.senderRecordFlush();
    BOOST_CHECK(!s.senderNeedFlush());
    transfers(s, "c");
    BOOST_CHECK(!s.senderNeedFlush());
    transfers(s, "d");
    BOOST_CHECK(s.senderNeedFlush());
    BOOST_CHECK_EQUAL(str(s.senderExpected(SessionPoint())), "CaCbCcCd");
}

QPID_AUTO_TEST_CASE(testPeerConfirmed) {
    qpid::SessionState::Configuration c;
    // sync after 2 1-byte transfers or equivalent bytes.
    c.replayFlushLimit = 2*(transferFrameSize()+contentFrameSize());
    qpid::SessionState s(SessionId(), c);
    s.setTimeout(1);
    s.senderGetCommandPoint();
    transfers(s, "ab");
    BOOST_CHECK(s.senderNeedFlush());
    transfers(s, "cd");
    BOOST_CHECK_EQUAL(str(s.senderExpected(SessionPoint(0,0))), "CaCbCcCd");
    s.senderConfirmed(SessionPoint(3));
    BOOST_CHECK_EQUAL(str(s.senderExpected(SessionPoint(3,0))), "Cd");
    BOOST_CHECK(!s.senderNeedFlush());

    // Multi-frame transfer.
    transfer1(s, "efg");
    transfers(s, "xy");
    BOOST_CHECK_EQUAL(str(s.senderExpected(SessionPoint(3,0))), "CdCefgCxCy");
    BOOST_CHECK(s.senderNeedFlush());

    s.senderConfirmed(SessionPoint(4));
    BOOST_CHECK_EQUAL(str(s.senderExpected(SessionPoint(4,0))), "CefgCxCy");
    BOOST_CHECK(s.senderNeedFlush());

    s.senderConfirmed(SessionPoint(5));
    BOOST_CHECK_EQUAL(str(s.senderExpected(SessionPoint(5,0))), "CxCy");
    BOOST_CHECK(s.senderNeedFlush());

    s.senderConfirmed(SessionPoint(6));
    BOOST_CHECK_EQUAL(str(s.senderExpected(SessionPoint(6,0))), "Cy");
    BOOST_CHECK(!s.senderNeedFlush());
}

QPID_AUTO_TEST_CASE(testPeerCompleted) {
    qpid::SessionState s;
    s.setTimeout(1);
    s.senderGetCommandPoint();
    // Completion implies confirmation
    transfers(s, "abc");
    BOOST_CHECK_EQUAL(str(s.senderExpected(SessionPoint(0,0))), "CaCbCc");
    SequenceSet set(SequenceSet() + 0 + 1);
    s.senderCompleted(set);
    BOOST_CHECK_EQUAL(str(s.senderExpected(SessionPoint(2,0))), "Cc");

    transfers(s, "def");
    // We dont do out-of-order confirmation, so this will only confirm up to 3:
    set = SequenceSet(SequenceSet() + 2 + 3 + 5);
    s.senderCompleted(set);
    BOOST_CHECK_EQUAL(str(s.senderExpected(SessionPoint(4,0))), "CeCf");
}

QPID_AUTO_TEST_CASE(testReceive) {
    // Advance expected/received correctly
    qpid::SessionState s;
    s.receiverSetCommandPoint(SessionPoint());
    BOOST_CHECK_EQUAL(s.receiverGetExpected(), SessionPoint(0));
    BOOST_CHECK_EQUAL(s.receiverGetReceived(), SessionPoint(0));

    BOOST_CHECK(s.receiverRecord(transferFrame(false)));
    BOOST_CHECK_EQUAL(s.receiverGetExpected(), SessionPoint(1));
    BOOST_CHECK_EQUAL(s.receiverGetReceived(), SessionPoint(1));

    BOOST_CHECK(s.receiverRecord(transferFrame(true)));
    SessionPoint point = SessionPoint(1, transferFrameSize());
    BOOST_CHECK_EQUAL(s.receiverGetExpected(), point);
    BOOST_CHECK_EQUAL(s.receiverGetReceived(), point);
    BOOST_CHECK(s.receiverRecord(contentFrame("", false)));
    point.offset += contentFrameSize(0);
    BOOST_CHECK_EQUAL(s.receiverGetExpected(), point);
    BOOST_CHECK_EQUAL(s.receiverGetReceived(), point);
    BOOST_CHECK(s.receiverRecord(contentFrame("", true)));
    BOOST_CHECK_EQUAL(s.receiverGetExpected(), SessionPoint(2));
    BOOST_CHECK_EQUAL(s.receiverGetReceived(), SessionPoint(2));

    // Idempotence barrier, rewind expected & receive some duplicates.
    s.receiverSetCommandPoint(SessionPoint(1));
    BOOST_CHECK(!s.receiverRecord(transferFrame(false)));
    BOOST_CHECK_EQUAL(s.receiverGetExpected(), SessionPoint(2));
    BOOST_CHECK_EQUAL(s.receiverGetReceived(), SessionPoint(2));
    BOOST_CHECK(s.receiverRecord(transferFrame(false)));
    BOOST_CHECK_EQUAL(s.receiverGetExpected(), SessionPoint(3));
    BOOST_CHECK_EQUAL(s.receiverGetReceived(), SessionPoint(3));
}

QPID_AUTO_TEST_CASE(testCompleted) {
    // completed & unknownCompleted
    qpid::SessionState s;
    s.receiverSetCommandPoint(SessionPoint());
    s.receiverRecord(transferFrame(false));
    s.receiverRecord(transferFrame(false));
    s.receiverRecord(transferFrame(false));
    s.receiverCompleted(1);
    BOOST_CHECK_EQUAL(s.receiverGetUnknownComplete(), SequenceSet(SequenceSet()+1));
    s.receiverCompleted(0);
    BOOST_CHECK_EQUAL(s.receiverGetUnknownComplete(),
                      SequenceSet(SequenceSet() + qpid::Range<SequenceNumber>(0,2)));
    s.receiverKnownCompleted(SequenceSet(SequenceSet()+1));
    BOOST_CHECK_EQUAL(s.receiverGetUnknownComplete(), SequenceSet(SequenceSet()+2));
    // TODO aconway 2008-04-30: missing tests for known-completed.
}

QPID_AUTO_TEST_CASE(testNeedKnownCompleted) {
    size_t flushInterval= 2*(transferFrameSize()+contentFrameSize())+1;
    qpid::SessionState::Configuration c(flushInterval);
    qpid::SessionState s(qpid::SessionId(), c);
    s.senderGetCommandPoint();
    transfers(s, "a");
    SequenceSet set(SequenceSet() + 0);
    s.senderCompleted(set);
    BOOST_CHECK(!s.senderNeedKnownCompleted());

    transfers(s, "b");
    set += 1;
    s.senderCompleted(set);
    BOOST_CHECK(!s.senderNeedKnownCompleted());

    transfers(s, "c");
    set += 2;
    s.senderCompleted(set);
    BOOST_CHECK(s.senderNeedKnownCompleted());
    s.senderRecordKnownCompleted();
    BOOST_CHECK(!s.senderNeedKnownCompleted());

    transfers(s, "de");
    set += 3;
    set += 4;
    s.senderCompleted(set);
    BOOST_CHECK(!s.senderNeedKnownCompleted());

    transfers(s, "f");
    set += 2;
    s.senderCompleted(set);
    BOOST_CHECK(s.senderNeedKnownCompleted());
    s.senderRecordKnownCompleted();
    BOOST_CHECK(!s.senderNeedKnownCompleted());
}


QPID_AUTO_TEST_SUITE_END()

}} // namespace qpid::tests
