/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.camel.component.disruptor;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;

import com.lmax.disruptor.InsufficientCapacityException;
import org.apache.camel.Component;
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.MultipleConsumersSupport;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.WaitForTaskToComplete;
import org.apache.camel.api.management.ManagedAttribute;
import org.apache.camel.api.management.ManagedResource;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriPath;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The disruptor component provides asynchronous SEDA behavior using LMAX Disruptor.
 *
 * This component works much as the standard SEDA Component, but utilizes a Disruptor
 * instead of a BlockingQueue utilized by the standard SEDA.
 */
@ManagedResource(description = "Managed Disruptor Endpoint")
@UriEndpoint(scheme = "disruptor,disruptor-vm", title = "Disruptor,Disruptor VM", syntax = "disruptor:name", consumerClass = DisruptorConsumer.class, label = "endpoint")
public class DisruptorEndpoint extends DefaultEndpoint implements MultipleConsumersSupport {
    public static final String DISRUPTOR_IGNORE_EXCHANGE = "disruptor.ignoreExchange";
    private static final Logger LOGGER = LoggerFactory.getLogger(DisruptorEndpoint.class);

    private final Set<DisruptorProducer> producers = new CopyOnWriteArraySet<DisruptorProducer>();
    private final Set<DisruptorConsumer> consumers = new CopyOnWriteArraySet<DisruptorConsumer>();
    private final DisruptorReference disruptorReference;

    @UriPath(description = "Name of queue") @Metadata(required = "true")
    private String name;
    @UriParam(label = "consumer", defaultValue = "1")
    private final int concurrentConsumers;
    @UriParam(label = "consumer")
    private final boolean multipleConsumers;
    @UriParam(label = "producer", defaultValue = "IfReplyExpected")
    private WaitForTaskToComplete waitForTaskToComplete = WaitForTaskToComplete.IfReplyExpected;
    @UriParam(label = "producer", defaultValue = "30000")
    private long timeout = 30000;
    @UriParam(defaultValue = "" + DisruptorComponent.DEFAULT_BUFFER_SIZE)
    private int size;
    @UriParam(label = "producer")
    private boolean blockWhenFull;
    @UriParam(label = "consumer", defaultValue = "Blocking")
    private DisruptorWaitStrategy waitStrategy;
    @UriParam(label = "producer", defaultValue = "Multi")
    private DisruptorProducerType producerType;

    public DisruptorEndpoint(final String endpointUri, final Component component,
                             final DisruptorReference disruptorReference, final int concurrentConsumers,
                             final boolean multipleConsumers, boolean blockWhenFull) throws Exception {
        super(endpointUri, component);
        this.disruptorReference = disruptorReference;
        this.name = disruptorReference.getName();
        this.concurrentConsumers = concurrentConsumers;
        this.multipleConsumers = multipleConsumers;
        this.blockWhenFull = blockWhenFull;
    }

    @ManagedAttribute(description = "Queue name")
    public String getName() {
        return name;
    }

    @ManagedAttribute(description = "Buffer max capacity")
    public int getBufferSize() {
        return disruptorReference.getBufferSize();
    }

    @ManagedAttribute(description = "Remaining capacity in ring buffer")
    public long getRemainingCapacity() throws DisruptorNotStartedException {
        return getDisruptor().getRemainingCapacity();
    }

    @ManagedAttribute(description = "Amount of pending exchanges waiting for consumption in ring buffer")
    public long getPendingExchangeCount() throws DisruptorNotStartedException {
        return getDisruptor().getPendingExchangeCount();
    }

    /**
     * Number of concurrent threads processing exchanges.
     */
    @ManagedAttribute(description = "Number of concurrent consumers")
    public int getConcurrentConsumers() {
        return concurrentConsumers;
    }

    @ManagedAttribute(description = "Option to specify whether the caller should wait for the async task to complete or not before continuing")
    public WaitForTaskToComplete getWaitForTaskToComplete() {
        return waitForTaskToComplete;
    }

    /**
     * Option to specify whether the caller should wait for the async task to complete or not before continuing.
     * The following three options are supported: Always, Never or IfReplyExpected. The first two values are self-explanatory.
     * The last value, IfReplyExpected, will only wait if the message is Request Reply based.
     */
    public void setWaitForTaskToComplete(final WaitForTaskToComplete waitForTaskToComplete) {
        this.waitForTaskToComplete = waitForTaskToComplete;
    }

    @ManagedAttribute(description = "Timeout (in milliseconds) before a producer will stop waiting for an asynchronous task to complete")
    public long getTimeout() {
        return timeout;
    }

    /**
     * Timeout (in milliseconds) before a producer will stop waiting for an asynchronous task to complete.
     * You can disable timeout by using 0 or a negative value.
     */
    public void setTimeout(final long timeout) {
        this.timeout = timeout;
    }

    @ManagedAttribute(description = "The maximum capacity of the Disruptors ringbuffer")
    public int getSize() {
        return size;
    }

    /**
     * The maximum capacity of the Disruptors ringbuffer
     * Will be effectively increased to the nearest power of two.
     * Notice: Mind if you use this option, then its the first endpoint being created with the queue name,
     * that determines the size. To make sure all endpoints use same size, then configure the size option
     * on all of them, or the first endpoint being created.
     */
    public void setSize(int size) {
        this.size = size;
    }

    @Override
    @ManagedAttribute(description = "Specifies whether multiple consumers are allowed")
    public boolean isMultipleConsumersSupported() {
        return isMultipleConsumers();
    }

    /**
     * Specifies whether multiple consumers are allowed.
     * If enabled, you can use Disruptor for Publish-Subscribe messaging.
     * That is, you can send a message to the queue and have each consumer receive a copy of the message.
     * When enabled, this option should be specified on every consumer endpoint.
     */
    public boolean isMultipleConsumers() {
        return multipleConsumers;
    }

    /**
     * Returns the current active consumers on this endpoint
     */
    public Set<DisruptorConsumer> getConsumers() {
        return Collections.unmodifiableSet(consumers);
    }

    /**
     * Returns the current active producers on this endpoint
     */
    public Set<DisruptorProducer> getProducers() {
        return Collections.unmodifiableSet(producers);
    }

    @ManagedAttribute
    public boolean isBlockWhenFull() {
        return blockWhenFull;
    }

    /**
     * Whether a thread that sends messages to a full Disruptor will block until the ringbuffer's capacity is no longer exhausted.
     * By default, the calling thread will block and wait until the message can be accepted.
     * By disabling this option, an exception will be thrown stating that the queue is full.
     */
    public void setBlockWhenFull(boolean blockWhenFull) {
        this.blockWhenFull = blockWhenFull;
    }

    @ManagedAttribute(description = "Defines the strategy used by consumer threads to wait on new exchanges to be published")
    public DisruptorWaitStrategy getWaitStrategy() {
        return waitStrategy;
    }

    /**
     * Defines the strategy used by consumer threads to wait on new exchanges to be published.
     * The options allowed are:Blocking, Sleeping, BusySpin and Yielding.
     */
    public void setWaitStrategy(DisruptorWaitStrategy waitStrategy) {
        this.waitStrategy = waitStrategy;
    }

    @ManagedAttribute(description = " Defines the producers allowed on the Disruptor")
    public DisruptorProducerType getProducerType() {
        return producerType;
    }

    /**
     * Defines the producers allowed on the Disruptor.
     * The options allowed are: Multi to allow multiple producers and Single to enable certain optimizations only
     * allowed when one concurrent producer (on one thread or otherwise synchronized) is active.
     */
    public void setProducerType(DisruptorProducerType producerType) {
        this.producerType = producerType;
    }

    @Override
    public boolean isSingleton() {
        return true;
    }

    @Override
    public Producer createProducer() throws Exception {
        if (getProducers().size() == 1 && getDisruptor().getProducerType() == DisruptorProducerType.Single) {
            throw new IllegalStateException(
                    "Endpoint can't support multiple producers when ProducerType SINGLE is configured");
        }
        return new DisruptorProducer(this, getWaitForTaskToComplete(), getTimeout(), isBlockWhenFull());
    }

    @Override
    public Consumer createConsumer(final Processor processor) throws Exception {
        return new DisruptorConsumer(this, processor);
    }

    @Override
    protected void doStart() throws Exception {
        // notify reference we are shutting down this endpoint
        disruptorReference.addEndpoint(this);
        super.doStart();
    }

    @Override
    protected void doStop() throws Exception {
        // notify reference we are shutting down this endpoint
        disruptorReference.removeEndpoint(this);
        super.doStop();
    }

    @Override
    protected void doShutdown() throws Exception {
        // notify component we are shutting down this endpoint
        if (getComponent() != null) {
            getComponent().onShutdownEndpoint(this);
        }

        super.doShutdown();
    }

    @Override
    public DisruptorComponent getComponent() {
        return (DisruptorComponent)super.getComponent();
    }

    void onStarted(final DisruptorConsumer consumer) throws Exception {
        synchronized (this) {
            // validate multiple consumers has been enabled is necessary
            if (!consumers.isEmpty() && !isMultipleConsumersSupported()) {
                throw new IllegalStateException(
                        "Multiple consumers for the same endpoint is not allowed: " + this);
            }
            if (consumers.add(consumer)) {
                LOGGER.debug("Starting consumer {} on endpoint {}", consumer, getEndpointUri());
                getDisruptor().reconfigure();
            } else {
                LOGGER.debug("Tried to start Consumer {} on endpoint {} but it was already started", consumer, getEndpointUri());
            }
        }
    }


    void onStopped(final DisruptorConsumer consumer) throws Exception {
        synchronized (this) {
            if (consumers.remove(consumer)) {
                LOGGER.debug("Stopping consumer {} on endpoint {}", consumer, getEndpointUri());
                getDisruptor().reconfigure();
            } else {
                LOGGER.debug("Tried to stop Consumer {} on endpoint {} but it was already stopped", consumer, getEndpointUri());
            }
        }
    }

    void onStarted(final DisruptorProducer producer) {
        producers.add(producer);
    }

    void onStopped(final DisruptorProducer producer) {
        producers.remove(producer);
    }

    Map<DisruptorConsumer, Collection<LifecycleAwareExchangeEventHandler>> createConsumerEventHandlers() {
        Map<DisruptorConsumer, Collection<LifecycleAwareExchangeEventHandler>> result =
                new HashMap<DisruptorConsumer, Collection<LifecycleAwareExchangeEventHandler>>();

        for (final DisruptorConsumer consumer : consumers) {
            result.put(consumer, consumer.createEventHandlers(concurrentConsumers));
        }

        return result;
    }

    /**
     * Called by DisruptorProducers to publish new exchanges on the RingBuffer, blocking when full
     */
    void publish(final Exchange exchange) throws DisruptorNotStartedException {
        disruptorReference.publish(exchange);
    }

    /**
     * Called by DisruptorProducers to publish new exchanges on the RingBuffer, throwing InsufficientCapacityException
     * when full
     *
     * @throws InsufficientCapacityException when the Ringbuffer is full.
     */
    void tryPublish(final Exchange exchange) throws DisruptorNotStartedException, InsufficientCapacityException {
        disruptorReference.tryPublish(exchange);
    }

    DisruptorReference getDisruptor() {
        return disruptorReference;
    }

    @Override
    public boolean equals(Object object) {
        boolean result = super.equals(object);
        return result && getCamelContext().equals(((DisruptorEndpoint)object).getCamelContext());
    }
    
    @Override
    public int hashCode() {
        return getEndpointUri().hashCode() * 37 + getCamelContext().hashCode();
    }
}
