/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
package org.apache.qpid.tests.protocol.v0_10;

import static java.nio.charset.StandardCharsets.UTF_8;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

import org.apache.qpid.server.protocol.v0_10.transport.BBDecoder;
import org.apache.qpid.server.protocol.v0_10.transport.BBEncoder;
import org.apache.qpid.server.protocol.v0_10.transport.ConnectionOpenOk;
import org.apache.qpid.server.protocol.v0_10.transport.ConnectionSecure;
import org.apache.qpid.server.protocol.v0_10.transport.ConnectionStart;
import org.apache.qpid.server.protocol.v0_10.transport.ConnectionTune;
import org.apache.qpid.server.protocol.v0_10.transport.DeliveryProperties;
import org.apache.qpid.server.protocol.v0_10.transport.Header;
import org.apache.qpid.server.protocol.v0_10.transport.MessageProperties;
import org.apache.qpid.server.protocol.v0_10.transport.Method;
import org.apache.qpid.server.protocol.v0_10.transport.SessionAttached;
import org.apache.qpid.tests.protocol.AbstractFrameTransport;
import org.apache.qpid.tests.protocol.AbstractInteraction;
import org.apache.qpid.tests.protocol.Response;
import org.apache.qpid.tests.utils.BrokerAdmin;

public class Interaction extends AbstractInteraction<Interaction>
{
    private final BrokerAdmin _brokerAdmin;
    private final BrokerAdmin.PortType _portType;
    private byte[] _protocolHeader;
    private ConnectionInteraction _connectionInteraction;
    private SessionInteraction _sessionInteraction;
    private MessageInteraction _messageInteraction;
    private ExecutionInteraction _executionInteraction;
    private QueueInteraction _queueInteraction;
    private ExchangeInteraction _exchangeInteraction;
    private int _channelId;
    private TxInteraction _txInteraction;

    public Interaction(final AbstractFrameTransport frameTransport,
                       final BrokerAdmin brokerAdmin,
                       final BrokerAdmin.PortType portType)
    {
        super(frameTransport);
        _connectionInteraction = new ConnectionInteraction(this);
        _sessionInteraction = new SessionInteraction(this);
        _messageInteraction = new MessageInteraction(this);
        _executionInteraction = new ExecutionInteraction(this);
        _txInteraction = new TxInteraction(this);
        _queueInteraction = new QueueInteraction(this);
        _exchangeInteraction = new ExchangeInteraction(this);
        _protocolHeader = getTransport().getProtocolHeader();
        _brokerAdmin = brokerAdmin;
        _portType = portType;
    }

    @Override
    public Interaction protocolHeader(final byte[] header)
    {
        _protocolHeader = header;
        return this;
    }

    @Override
    protected byte[] getProtocolHeader()
    {
        return _protocolHeader;
    }

    public <T extends Method> Interaction sendPerformative(final T performative) throws Exception
    {
        performative.setChannel(_channelId);
        sendPerformativeAndChainFuture(copyPerformative(performative));
        return this;
    }

    public <T extends Method> Interaction sendPerformativeWithoutCopying(final T performative) throws Exception
    {
        performative.setChannel(_channelId);
        sendPerformativeAndChainFuture(performative);
        return this;
    }

    public ConnectionInteraction connection()
    {
        return _connectionInteraction;
    }

    private <T extends Method> T copyPerformative(final T src)
    {
        T dst = (T) Method.create(src.getStructType());
        final BBEncoder encoder = new BBEncoder(4096);
        encoder.init();
        src.write(encoder);
        ByteBuffer buffer = encoder.buffer();

        final BBDecoder decoder = new BBDecoder();
        decoder.init(buffer);
        dst.read(decoder);
        dst.setChannel(src.getChannel());

        if (src.getHeader() != null)
        {
            Header srcHeader = src.getHeader();
            MessageProperties dstMessageProperties = null;
            DeliveryProperties dstDeliveryProperties = null;

            if (srcHeader.getMessageProperties() != null)
            {
                MessageProperties properties = srcHeader.getMessageProperties();
                dstMessageProperties = new MessageProperties();

                encoder.init();
                properties.write(encoder);

                decoder.init(encoder.buffer());
                dstMessageProperties.read(decoder);
            }

            if (srcHeader.getDeliveryProperties() != null)
            {
                DeliveryProperties properties = srcHeader.getDeliveryProperties();
                dstDeliveryProperties = new DeliveryProperties();

                encoder.init();
                properties.write(encoder);

                decoder.init(encoder.buffer());
                dstDeliveryProperties.read(decoder);
            }

            if (dstMessageProperties != null || dstDeliveryProperties != null)
            {
                dst.setHeader(new Header(dstDeliveryProperties, dstMessageProperties));
            }
        }

        if (src.getBody() != null)
        {
            dst.setBody(src.getBody());
        }

        return dst;
    }

    public Interaction negotiateOpen() throws Exception
    {
        authenticateConnection().connection().tuneOk()
                                .connection().open()
                                .consumeResponse(ConnectionOpenOk.class);
        return this;
    }

    public Interaction authenticateConnection() throws Exception
    {
        if (_portType == BrokerAdmin.PortType.ANONYMOUS_AMQP || _portType == BrokerAdmin.PortType.ANONYMOUS_AMQPWS)
        {
            openAnonymous();
        }
        else
        {
            final ConnectionStart start = this.negotiateProtocol().consumeResponse()
                                              .consumeResponse().getLatestResponse(ConnectionStart.class);
            final List<Object> supportedMechanisms =
                    start.getMechanisms() == null ? Collections.emptyList() : start.getMechanisms();

            if (supportedMechanisms.stream().noneMatch(m -> String.valueOf(m).equalsIgnoreCase(ConnectionInteraction.SASL_MECHANISM_PLAIN)))
            {
                if (supportedMechanisms.stream()
                                       .noneMatch(m -> String.valueOf(m).equalsIgnoreCase(ConnectionInteraction.SASL_MECHANISM_ANONYMOUS)))
                {
                    throw new IllegalStateException(String.format(
                            "PLAIN or ANONYMOUS SASL mechanism is not listed among supported '%s'", supportedMechanisms.stream().map(
                                    String::valueOf).collect(
                                    Collectors.joining(","))));
                }
                else
                {
                    openAnonymous();
                }
            }
            else
            {
                final byte[] initialResponse = String.format("\0%s\0%s",
                                                             _brokerAdmin.getValidUsername(),
                                                             _brokerAdmin.getValidPassword())
                                                     .getBytes(UTF_8);
                this.connection().startOkMechanism(ConnectionInteraction.SASL_MECHANISM_PLAIN).startOk()
                    .consumeResponse(ConnectionSecure.class)
                    .connection().secureOk(initialResponse).consumeResponse(ConnectionTune.class);
            }
        }
        return this;
    }

    private void openAnonymous() throws Exception
    {
        this.negotiateProtocol().consumeResponse()
            .consumeResponse(ConnectionStart.class)
            .connection().startOkMechanism(ConnectionInteraction.SASL_MECHANISM_ANONYMOUS).startOk()
            .consumeResponse(ConnectionTune.class);
    }

    public SessionInteraction session()
    {
        return _sessionInteraction;
    }

    public int getChannelId()
    {
        return _channelId;
    }

    public Interaction channelId(final int channelId)
    {
        _channelId = channelId;
        return this;
    }

    public Interaction attachSession(final byte[] sessionName) throws Exception
    {
        this.session()
            .attachName(sessionName)
            .attach()
            .consumeResponse(SessionAttached.class)
            .session().commandPointCommandId(0).commandPoint();
        return this;
    }

    public MessageInteraction message()
    {
        return _messageInteraction;
    }

    public ExecutionInteraction execution()
    {
        return _executionInteraction;
    }

    public TxInteraction tx()
    {
        return _txInteraction;
    }

    public QueueInteraction queue()
    {
        return _queueInteraction;
    }

    public ExchangeInteraction exchange()
    {
        return _exchangeInteraction;
    }

    public <T extends Method> T consume(final Class<T> expected,
                                        final Class<? extends Method>... ignore)
            throws Exception
    {
        final Class<? extends Method>[] expectedResponses = Arrays.copyOf(ignore, ignore.length + 1);
        expectedResponses[ignore.length] = expected;

        T completed = null;
        do
        {
            Response<?> response = consumeResponse(expectedResponses).getLatestResponse();
            if (expected.isAssignableFrom(response.getBody().getClass()))
            {
                completed = (T) response.getBody();
            }
        }
        while (completed == null);
        return completed;
    }
}
