/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.openwire.codec;

import java.io.IOException;
import java.net.URI;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import jakarta.jms.JMSException;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;

import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.openwire.codec.OpenWireFormat;
import org.apache.activemq.openwire.codec.OpenWireFormatFactory;
import org.apache.activemq.openwire.commands.BrokerInfo;
import org.apache.activemq.openwire.commands.Command;
import org.apache.activemq.openwire.commands.KeepAliveInfo;
import org.apache.activemq.openwire.commands.Message;
import org.apache.activemq.openwire.commands.MessageDispatch;
import org.apache.activemq.openwire.commands.Response;
import org.apache.activemq.openwire.commands.ShutdownInfo;
import org.apache.activemq.openwire.commands.WireFormatInfo;
import org.apache.activemq.openwire.util.TcpTransport;
import org.apache.activemq.openwire.util.TransportListener;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Base class used in testing the interoperability between the OpenWire
 * commands and Marshalers in this library and those in ActiveMQ.
 */
public abstract class OpenWireInteropTestSupport implements TransportListener {

    private static final Logger LOG = LoggerFactory.getLogger(OpenWireInteropTestSupport.class);

    @Rule public TestName name = new TestName();

    protected BrokerService brokerService;

    private TcpTransport transport;
    protected URI connectionURI;

    private OpenWireFormatFactory factory;
    protected OpenWireFormat wireFormat;

    private CountDownLatch connected;

    private WireFormatInfo remoteWireformatInfo;
    private BrokerInfo remoteInfo;
    private Exception failureCause;
    private final AtomicInteger requestIdGenerator = new AtomicInteger(1);

    private final Map<Integer, CountDownLatch> requestMap =
        new ConcurrentHashMap<Integer, CountDownLatch>();

    protected Command latest;
    protected final Queue<Message> messages = new LinkedList<Message>();

    @Before
    public void setUp() throws Exception {
        brokerService = createBroker();
        brokerService.start();
        brokerService.waitUntilStarted();

        factory = new OpenWireFormatFactory();
        factory.setVersion(getOpenWireVersion());
        factory.setCacheEnabled(false);
        factory.setTightEncodingEnabled(isTightEncodingEnabled());

        wireFormat = factory.createWireFormat();
    }

    @After
    public void tearDown() throws Exception {
        disconnect();

        if (brokerService != null) {
            brokerService.stop();
            brokerService.waitUntilStopped();
        }
    }

    protected abstract int getOpenWireVersion();

    protected abstract boolean isTightEncodingEnabled();

    protected void connect() throws Exception {
        connected = new CountDownLatch(1);

        transport = new TcpTransport(wireFormat, connectionURI);
        transport.setTransportListener(this);
        transport.start();

        transport.oneway(wireFormat.getPreferedWireFormatInfo());
    }

    protected void disconnect() throws Exception {
        if (transport != null && transport.isStarted()) {
            ShutdownInfo done = new ShutdownInfo();
            transport.oneway(done);
            Thread.sleep(50);
            transport.stop();
        }
    }

    protected boolean request(Command command, long timeout, TimeUnit units) throws Exception {
        command.setCommandId(requestIdGenerator.getAndIncrement());
        command.setResponseRequired(true);
        CountDownLatch complete = new CountDownLatch(1);
        requestMap.put(command.getCommandId(), complete);
        transport.oneway(command);
        return complete.await(timeout, units);
    }

    protected void requestNoResponse(Command command) throws Exception {
        command.setCommandId(requestIdGenerator.getAndIncrement());
        command.setResponseRequired(false);
        transport.oneway(command);
    }

    protected boolean awaitConnected(long time, TimeUnit unit) throws InterruptedException {
        return connected.await(time, unit);
    }

    protected BrokerService createBroker() throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setPersistent(false);
        brokerService.setAdvisorySupport(false);
        brokerService.setDeleteAllMessagesOnStartup(true);
        brokerService.setUseJmx(true);

        TransportConnector connector = brokerService.addConnector("tcp://0.0.0.0:0?transport.trace=true&trace=true");
        connectionURI = connector.getPublishableConnectURI();
        LOG.debug("Using openwire port: {}", connectionURI);
        return brokerService;
    }

    @Override
    public void onCommand(Object command) {
        try {
            if (command instanceof WireFormatInfo) {
                handleWireFormatInfo((WireFormatInfo) command);
            } else if (command instanceof KeepAliveInfo) {
                handleKeepAliveInfo((KeepAliveInfo) command);
            } else if (command instanceof BrokerInfo) {
                handleBrokerInfo((BrokerInfo) command);
            } else if (command instanceof Response) {
                Response response = (Response) command;
                this.latest = response;
                LOG.info("Received response for request: {}, response = {}", response.getCorrelationId(), latest);
                CountDownLatch done = requestMap.get(response.getCorrelationId());
                if (done != null) {
                    done.countDown();
                }
            } else if (command instanceof MessageDispatch) {
                LOG.info("Received new MessageDispatch: {}", command);
                MessageDispatch dispatch = (MessageDispatch) command;
                messages.add(dispatch.getMessage());
            } else {
                LOG.info("Received unknown command: {}", command);
            }
        } catch (Exception e) {
            failureCause = e;
        }
    }

    @Override
    public void onException(IOException error) {
        failureCause = error;
    }

    @Override
    public void transportInterupted() {
    }

    @Override
    public void transportResumed() {
    }

    public WireFormatInfo getRemoteWireFormatInfo() {
        return this.remoteWireformatInfo;
    }

    public BrokerInfo getRemoteBrokerInfo() {
        return this.remoteInfo;
    }

    public Command getLastCommandReceived() {
        return this.latest;
    }

    public boolean isFailed() {
        return this.failureCause != null;
    }

    protected void handleWireFormatInfo(WireFormatInfo info) throws Exception {
        LOG.info("Received remote WireFormatInfo: {}", info);
        this.remoteWireformatInfo = info;

        if (LOG.isDebugEnabled()) {
            LOG.debug(this + " before negotiation: " + wireFormat);
        }
        if (!info.isValid()) {
            onException(new IOException("Remote wire format magic is invalid"));
        } else if (info.getVersion() < getOpenWireVersion()) {
            onException(new IOException("Remote wire format (" + info.getVersion() +
                        ") is lower the minimum version required (" + getOpenWireVersion() + ")"));
        }

        wireFormat.renegotiateWireFormat(info);
        if (LOG.isDebugEnabled()) {
            LOG.debug(this + " after negotiation: " + wireFormat);
        }

        connected.countDown();
    }

    protected void handleKeepAliveInfo(KeepAliveInfo info) throws Exception {
        LOG.info("Received remote KeepAliveInfo: {}", info);
        if (info.isResponseRequired()) {
            KeepAliveInfo response = new KeepAliveInfo();
            transport.oneway(response);
        }
    }

    protected void handleBrokerInfo(BrokerInfo info) throws Exception {
        LOG.info("Received remote BrokerInfo: {}", info);
        this.remoteInfo = info;
    }

    protected QueueViewMBean getProxyToQueue(String name) throws MalformedObjectNameException, JMSException {
        ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName="+name);
        QueueViewMBean proxy = (QueueViewMBean) brokerService.getManagementContext()
                .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
        return proxy;
    }

    protected QueueViewMBean getProxyToTopic(String name) throws MalformedObjectNameException, JMSException {
        ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Topic,destinationName="+name);
        QueueViewMBean proxy = (QueueViewMBean) brokerService.getManagementContext()
                .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
        return proxy;
    }
}
