/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */

package org.apache.qpid.tests.protocol.v1_0.messaging;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.oneOf;

import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
import org.apache.qpid.server.protocol.v1_0.type.transport.End;
import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.protocol.v1_0.Utils;
import org.apache.qpid.tests.protocol.Response;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;

public class MultiTransferTest extends BrokerAdminUsingTestBase
{
    private InetSocketAddress _brokerAddress;
    private String _originalMmsMessageStorePersistence;

    @Before
    public void setUp()
    {
        _originalMmsMessageStorePersistence = System.getProperty("qpid.tests.mms.messagestore.persistence");
        System.setProperty("qpid.tests.mms.messagestore.persistence", "false");

        getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
        _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
    }

    @After
    public void tearDown()
    {
        if (_originalMmsMessageStorePersistence != null)
        {
            System.setProperty("qpid.tests.mms.messagestore.persistence", _originalMmsMessageStorePersistence);
        }
        else
        {
            System.clearProperty("qpid.tests.mms.messagestore.persistence");
        }
    }

    @Test
    @SpecificationTest(section = "2.6.14",
            description = "For messages that are too large to fit within the maximum frame size, additional data MAY"
                          + " be transferred in additional transfer frames by setting the more flag on all"
                          + " but the last transfer frame")
    public void multiTransferMessage() throws Exception
    {
        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
        {
            QpidByteBuffer[] payloads = Utils.splitPayload(getTestName(), 2);

            final UnsignedInteger deliveryId = UnsignedInteger.ZERO;
            final Binary deliveryTag = new Binary("testTransfer".getBytes(UTF_8));

            Interaction interaction = transport.newInteraction();
            Disposition disposition = interaction.negotiateProtocol().consumeResponse()
                                                 .open().consumeResponse(Open.class)
                                                 .begin().consumeResponse(Begin.class)
                                                 .attachRole(Role.SENDER)
                                                 .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
                                                 .attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL)
                                                 .attach().consumeResponse(Attach.class)
                                                 .consumeResponse(Flow.class)
                                                 .transferPayload(payloads[0])
                                                 .transferDeliveryId(deliveryId)
                                                 .transferDeliveryTag(deliveryTag)
                                                 .transferMore(true)
                                                 .transfer()
                                                 .sync()
                                                 .transferMore(false)
                                                 .transferPayload(payloads[1])
                                                 .transfer()
                                                 .consumeResponse()
                                                 .getLatestResponse(Disposition.class);

            for (final QpidByteBuffer payload : payloads)
            {
                payload.dispose();
            }
            assertThat(disposition.getFirst(), is(equalTo(deliveryId)));
            assertThat(disposition.getLast(), oneOf(null, deliveryId));
            assertThat(disposition.getSettled(), is(equalTo(true)));
        }
        assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(getTestName())));
    }

    @Test
    @SpecificationTest(section = "2.7.5",
            description = "[delivery-id] On continuation transfers the delivery-id MAY be omitted..."
                          + "[delivery-tag] field MUST be specified for the first transfer of a multi-transfer"
                          + " message and can only be omitted for continuation transfers.")
    public void multiTransferMessageOmittingOptionalTagAndID() throws Exception
    {
        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
        {
            QpidByteBuffer[] payloads = Utils.splitPayload(getTestName(), 4);
            final UnsignedInteger deliveryId = UnsignedInteger.ZERO;
            final Binary deliveryTag = new Binary("testTransfer".getBytes(UTF_8));

            Interaction interaction = transport.newInteraction();
            interaction.negotiateProtocol().consumeResponse()
                       .open().consumeResponse(Open.class)
                       .begin().consumeResponse(Begin.class)
                       .attachRole(Role.SENDER)
                       .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
                       .attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL)
                       .attach().consumeResponse(Attach.class)
                       .consumeResponse(Flow.class)
                       .transferDeliveryId(deliveryId)
                       .transferDeliveryTag(deliveryTag)
                       .transferMore(true)
                       .transferPayload(payloads[0])
                       .transfer()
                       .sync()
                       .transferDeliveryId(deliveryId)
                       .transferDeliveryTag(null)
                       .transferMore(true)
                       .transferPayload(payloads[1])
                       .transfer()
                       .sync()
                       .transferDeliveryId(null)
                       .transferDeliveryTag(deliveryTag)
                       .transferMore(true)
                       .transferPayload(payloads[2])
                       .transfer()
                       .sync()
                       .transferDeliveryId(null)
                       .transferDeliveryTag(null)
                       .transferMore(false)
                       .transferPayload(payloads[3])
                       .transfer()
                       .consumeResponse();

            Disposition disposition = interaction.getLatestResponse(Disposition.class);

            for (final QpidByteBuffer payload : payloads)
            {
                payload.dispose();
            }
            assertThat(disposition.getFirst(), is(equalTo(deliveryId)));
            assertThat(disposition.getLast(), oneOf(null, deliveryId));
            assertThat(disposition.getSettled(), is(equalTo(true)));
            assertThat(disposition.getState(), is(instanceOf(Accepted.class)));
        }
        assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(getTestName())));
    }

    @Test
    @SpecificationTest(section = "2.6.14",
            description = "The sender MAY indicate an aborted attempt to deliver a message by setting the abort flag on the last transfer."
                          + "In this case the receiver MUST discard the message data that was transferred prior to the abort.")
    public void abortMultiTransferMessage() throws Exception
    {
        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
        {
            QpidByteBuffer[] payloads = Utils.splitPayload(getTestName(), 2);

            final UnsignedInteger deliveryId = UnsignedInteger.ZERO;
            final Binary deliveryTag = new Binary("testTransfer".getBytes(UTF_8));

            Interaction interaction = transport.newInteraction();
            interaction.negotiateProtocol().consumeResponse()
                       .open().consumeResponse(Open.class)
                       .begin().consumeResponse(Begin.class)
                       .attachRole(Role.SENDER)
                       .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
                       .attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL)
                       .attach().consumeResponse(Attach.class)
                       .consumeResponse(Flow.class)
                       .transferPayload(payloads[0])
                       .transferDeliveryId(deliveryId)
                       .transferDeliveryTag(deliveryTag)
                       .transferMore(true)
                       .transfer()
                       .sync()
                       .transferPayload(null)
                       .transferMore(null)
                       .transferAborted(true)
                       .transfer();

            for (final QpidByteBuffer payload : payloads)
            {
                payload.dispose();
            }
            Response<?> latestResponse = interaction.consumeResponse(new Class<?>[] {null}).getLatestResponse();
            assertThat(latestResponse, is(nullValue()));
        }
    }

    @Test
    @SpecificationTest(section = "2.6.14",
            description = "[...]messages being transferred along different links MAY be interleaved")
    public void multiTransferInterleaved() throws Exception
    {
        String messageContent1 = getTestName() + "_1";
        String messageContent2 = getTestName() + "_2";
        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
        {
            QpidByteBuffer[] messagePayload1 = Utils.splitPayload(messageContent1, 2);
            QpidByteBuffer[] messagePayload2 = Utils.splitPayload(messageContent2, 2);

            UnsignedInteger linkHandle1 = UnsignedInteger.ZERO;
            UnsignedInteger linkHandle2 = UnsignedInteger.ONE;
            Binary deliveryTag1 = new Binary("testTransfer1".getBytes(UTF_8));
            Binary deliveryTag2 = new Binary("testTransfer2".getBytes(UTF_8));
            UnsignedInteger deliverId1 = UnsignedInteger.ZERO;
            UnsignedInteger deliveryId2 = UnsignedInteger.ONE;

            Interaction interaction = transport.newInteraction();

            interaction.negotiateProtocol().consumeResponse()
                       .open().consumeResponse(Open.class)
                       .begin().consumeResponse(Begin.class)

                       .attachName("testLink1")
                       .attachHandle(linkHandle1)
                       .attachRole(Role.SENDER)
                       .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
                       .attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL)
                       .attach().consumeResponse(Attach.class)
                       .consumeResponse(Flow.class)

                       .attachName("testLink2")
                       .attachHandle(linkHandle2)
                       .attachRole(Role.SENDER)
                       .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
                       .attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL)
                       .attach().consumeResponse(Attach.class)
                       .consumeResponse(Flow.class)

                       .transferHandle(linkHandle1)
                       .transferDeliveryId(deliverId1)
                       .transferDeliveryTag(deliveryTag1)
                       .transferMore(true)
                       .transferPayload(messagePayload1[0])
                       .transfer()
                       .sync()

                       .transferHandle(linkHandle2)
                       .transferDeliveryId(deliveryId2)
                       .transferDeliveryTag(deliveryTag2)
                       .transferMore(true)
                       .transferPayload(messagePayload2[0])
                       .transfer()
                       .sync()

                       .transferHandle(linkHandle1)
                       .transferDeliveryId(deliverId1)
                       .transferDeliveryTag(deliveryTag1)
                       .transferMore(false)
                       .transferPayload(messagePayload1[1])
                       .transfer()
                       .sync()

                       .transferHandle(linkHandle2)
                       .transferDeliveryId(deliveryId2)
                       .transferDeliveryTag(deliveryTag2)
                       .transferMore(false)
                       .transferPayload(messagePayload2[1])
                       .transfer()
                       .sync();

            for (final QpidByteBuffer payload : messagePayload1)
            {
                payload.dispose();
            }
            for (final QpidByteBuffer payload : messagePayload2)
            {
                payload.dispose();
            }

            Map<UnsignedInteger, Disposition> dispositionMap = new HashMap<>();
            for (int i = 0; i < 2; i++)
            {
                Disposition disposition = interaction.consumeResponse(Disposition.class)
                                                     .getLatestResponse(Disposition.class);
                dispositionMap.put(disposition.getFirst(), disposition);

                assertThat(disposition.getLast(), oneOf(null, disposition.getFirst()));
                assertThat(disposition.getSettled(), is(equalTo(true)));
                assertThat(disposition.getState(), is(instanceOf(Accepted.class)));
            }

            assertThat("Unexpected number of dispositions", dispositionMap.size(), equalTo(2));
            assertThat(dispositionMap.containsKey(deliverId1), is(true));
            assertThat(dispositionMap.containsKey(deliveryId2), is(true));
        }
        assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(messageContent1)));
        assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(messageContent2)));
    }

    @Test
    @SpecificationTest(section = "2.6.14",
            description = "[...]messages transferred along a single link MUST NOT be interleaved")
    public void illegallyInterleavedMultiTransferOnSingleLink() throws Exception
    {
        String messageContent1 = getTestName() + "_1";
        String messageContent2 = getTestName() + "_2";
        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
        {
            QpidByteBuffer[] messagePayload1 = Utils.splitPayload(messageContent1, 2);
            QpidByteBuffer[] messagePayload2 = Utils.splitPayload(messageContent2, 2);

            Binary deliveryTag1 = new Binary("testTransfer1".getBytes(UTF_8));
            Binary deliveryTag2 = new Binary("testTransfer2".getBytes(UTF_8));
            UnsignedInteger deliverId1 = UnsignedInteger.ZERO;
            UnsignedInteger deliveryId2 = UnsignedInteger.ONE;

            Interaction interaction = transport.newInteraction();

            interaction.negotiateProtocol().consumeResponse()
                       .open().consumeResponse(Open.class)
                       .begin().consumeResponse(Begin.class)

                       .attachRole(Role.SENDER)
                       .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
                       .attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL)
                       .attach().consumeResponse(Attach.class)
                       .consumeResponse(Flow.class)

                       .transferDeliveryId(deliverId1)
                       .transferDeliveryTag(deliveryTag1)
                       .transferMore(true)
                       .transferPayload(messagePayload1[0])
                       .transfer()
                       .sync()

                       .transferDeliveryId(deliveryId2)
                       .transferDeliveryTag(deliveryTag2)
                       .transferMore(true)
                       .transferPayload(messagePayload2[0])
                       .transfer()
                       .sync();
            for (final QpidByteBuffer payload : messagePayload1)
            {
                payload.dispose();
            }
            for (final QpidByteBuffer payload : messagePayload2)
            {
                payload.dispose();
            }

            interaction.consumeResponse(Detach.class, End.class, Close.class);
        }
    }
}
