/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.qpid.server.protocol.v0_10;

import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.util.ArrayList;
import java.util.List;

import javax.security.auth.Subject;

import com.google.common.util.concurrent.Futures;

import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.configuration.updater.TaskExecutorImpl;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.BrokerModel;
import org.apache.qpid.server.model.BrokerTestHelper;
import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Session;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.protocol.v0_10.transport.Binary;
import org.apache.qpid.server.protocol.v0_10.transport.ExecutionErrorCode;
import org.apache.qpid.server.protocol.v0_10.transport.ExecutionException;
import org.apache.qpid.server.protocol.v0_10.transport.MessageTransfer;
import org.apache.qpid.server.protocol.v0_10.transport.Method;
import org.apache.qpid.test.utils.QpidTestCase;

public class ServerSessionTest extends QpidTestCase
{

    private VirtualHost<?> _virtualHost;
    private CurrentThreadTaskExecutor _taskExecutor;

    @Override
    public void setUp() throws Exception
    {
        super.setUp();
        BrokerTestHelper.setUp();
        _taskExecutor = new CurrentThreadTaskExecutor();
        _taskExecutor.start();
        _virtualHost = BrokerTestHelper.createVirtualHost(getName());
    }

    @Override
    public void tearDown() throws Exception
    {
        try
        {
            if (_virtualHost != null)
            {
                _virtualHost.close();
            }
        }
        finally
        {
            try
            {
                if (_taskExecutor != null)
                {
                    _taskExecutor.stop();
                }
            }
            finally
            {
                BrokerTestHelper.tearDown();
                super.tearDown();
            }
        }
    }

    public void testOverlargeMessageTest() throws Exception
    {
        final Broker<?> broker = mock(Broker.class);
        when(broker.getContextValue(eq(Long.class), eq(Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT))).thenReturn(0l);

        AmqpPort port = createMockPort();

        final AMQPConnection_0_10 modelConnection = mock(AMQPConnection_0_10.class);
        when(modelConnection.closeAsync()).thenReturn(Futures.immediateFuture(null));
        when(modelConnection.getAddressSpace()).thenReturn(_virtualHost);
        when(modelConnection.getContextProvider()).thenReturn(_virtualHost);
        when(modelConnection.getBroker()).thenReturn(broker);
        when(modelConnection.getEventLogger()).thenReturn(mock(EventLogger.class));
        when(modelConnection.getContextValue(Long.class, Session.PRODUCER_AUTH_CACHE_TIMEOUT)).thenReturn(Session.PRODUCER_AUTH_CACHE_TIMEOUT_DEFAULT);
        when(modelConnection.getContextValue(Integer.class, Session.PRODUCER_AUTH_CACHE_SIZE)).thenReturn(Session.PRODUCER_AUTH_CACHE_SIZE_DEFAULT);
        when(modelConnection.getContextValue(Long.class, Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE)).thenReturn(Connection.DEFAULT_MAX_UNCOMMITTED_IN_MEMORY_SIZE);
        when(modelConnection.getChildExecutor()).thenReturn(_taskExecutor);
        when(modelConnection.getModel()).thenReturn(BrokerModel.getInstance());
        when(modelConnection.getPort()).thenReturn(port);

        Subject subject = new Subject();
        when(modelConnection.getSubject()).thenReturn(subject);
        when(modelConnection.getMaxMessageSize()).thenReturn(1024l);
        ServerConnection connection = new ServerConnection(1, broker, port, Transport.TCP, modelConnection);
        connection.setVirtualHost(_virtualHost);

        final List<Method> invokedMethods = new ArrayList<>();
        ServerSession session = new ServerSession(connection, new ServerSessionDelegate(),
                                                   new Binary(getName().getBytes()), 0)
        {
            @Override
            public void invoke(final Method m)
            {
                invokedMethods.add(m);
            }
        };
        Session_0_10 modelSession = new Session_0_10(modelConnection, 1, session);
        session.setModelObject(modelSession);
        ServerSessionDelegate delegate = new ServerSessionDelegate();

        MessageTransfer xfr = new MessageTransfer();
        byte[] body1 = new byte[2048];
        xfr.setBody(QpidByteBuffer.wrap(body1));
        delegate.messageTransfer(session, xfr);

        assertFalse("No methods invoked - expecting at least 1", invokedMethods.isEmpty());
        Method firstInvoked = invokedMethods.get(0);
        assertTrue("First invoked method not execution error", firstInvoked instanceof ExecutionException);
        assertEquals(ExecutionErrorCode.RESOURCE_LIMIT_EXCEEDED, ((ExecutionException)firstInvoked).getErrorCode());

        invokedMethods.clear();

        // test the boundary condition

        byte[] body = new byte[1024];
        xfr.setBody(QpidByteBuffer.wrap(body));
        delegate.messageTransfer(session, xfr);

        assertTrue("Methods invoked when not expecting any", invokedMethods.isEmpty());
    }

    public AmqpPort createMockPort()
    {
        AmqpPort port = mock(AmqpPort.class);
        TaskExecutor childExecutor = new TaskExecutorImpl();
        childExecutor.start();
        when(port.getChildExecutor()).thenReturn(childExecutor);
        when(port.getCategoryClass()).thenReturn(Port.class);
        when(port.getModel()).thenReturn(BrokerModel.getInstance());
        return port;
    }


}
