/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.stream.camel;

import java.util.Map;
import org.apache.camel.CamelContext;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.ServiceStatus;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.support.CamelContextHelper;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.stream.StreamAdapter;
import org.apache.ignite.stream.StreamMultipleTupleExtractor;
import org.apache.ignite.stream.StreamSingleTupleExtractor;

/**
 * This streamer consumes messages from an Apache Camel consumer endpoint and feeds them into an Ignite data streamer.
 *
 * The only mandatory properties are {@link #endpointUri} and the appropriate stream tuple extractor (either {@link
 * StreamSingleTupleExtractor} or {@link StreamMultipleTupleExtractor}.
 *
 * The user can also provide a custom {@link CamelContext} in case they want to attach custom components, a {@link
 * org.apache.camel.component.properties.PropertiesComponent}, set tracers, management strategies, etc.
 *
 * @see <a href="http://camel.apache.org">Apache Camel</a>
 * @see <a href="http://camel.apache.org/components.html">Apache Camel components</a>
 */
public class CamelStreamer<K, V> extends StreamAdapter<Exchange, K, V> implements Processor {
    /** Logger. */
    private IgniteLogger log;

    /** The Camel Context. */
    private CamelContext camelCtx;

    /** The endpoint URI to consume from. */
    private String endpointUri;

    /** Camel endpoint. */
    private Endpoint endpoint;

    /** Camel consumer. */
    private Consumer consumer;

    /** A {@link Processor} to generate the response. */
    private Processor resProc;

    /**
     * Starts the streamer.
     *
     * @throws IgniteException In cases when failed to start the streamer.
     */
    public void start() throws IgniteException {
        // Ensure that the endpoint URI is provided.
        A.notNullOrEmpty(endpointUri, "endpoint URI must be provided");

        // Check that one and only one tuple extractor is provided.
        A.ensure(!(getSingleTupleExtractor() == null && getMultipleTupleExtractor() == null),
            "tuple extractor missing");

        // If a custom CamelContext is not provided, initialize one.
        if (camelCtx == null)
            camelCtx = new DefaultCamelContext();

        // If the camel context is not started then simply start it up
        if (!camelCtx.isStarted())
            camelCtx.start();

        if (!camelCtx.isRunAllowed())
            throw new IgniteException("Failed to start Camel streamer (CamelContext not in a runnable state).");

        log = getIgnite().log();

        // Instantiate the Camel endpoint.
        try {
            endpoint = CamelContextHelper.getMandatoryEndpoint(camelCtx, endpointUri);
        }
        catch (Exception e) {
            U.error(log, e);

            camelCtx.stop();
            throw new IgniteException("Failed to start Camel streamer [errMsg=" + e.getMessage() + ']');
        }

        // Create the Camel consumer.
        try {
            consumer = endpoint.createConsumer(this);
        }
        catch (Exception e) {
            U.error(log, e);

            camelCtx.stop();
            throw new IgniteException("Failed to start Camel streamer [errMsg=" + e.getMessage() + ']');
        }

        // Start the Camel services.
        try {
            ServiceHelper.startService(camelCtx, endpoint, consumer);
        }
        catch (Exception e) {
            U.error(log, e);

            camelCtx.stop();
            try {
                ServiceHelper.stopAndShutdownServices(camelCtx, endpoint, consumer);

                consumer = null;
                endpoint = null;
            }
            catch (Exception e1) {
                throw new IgniteException("Failed to start Camel streamer; failed to stop the context, endpoint or " +
                    "consumer during rollback of failed initialization [errMsg=" + e.getMessage() + ", stopErrMsg=" +
                    e1.getMessage() + ']');
            }

            throw new IgniteException("Failed to start Camel streamer [errMsg=" + e.getMessage() + ']');
        }

        U.log(log, "Started Camel streamer consuming from endpoint URI: " + endpointUri);
    }

    /**
     * Stops the streamer.
     *
     * @throws IgniteException In cases if failed to stop the streamer.
     */
    public void stop() throws IgniteException {
        // If the Camel Context is stopping or stopped, reject this call to stop.
        if (camelCtx.getStatus() == ServiceStatus.Stopped || camelCtx.getStatus() == ServiceStatus.Stopping)
            throw new IgniteException("Failed to stop Camel streamer (CamelContext already stopped or stopping).");

        // Stop Camel services.
        try {
            ServiceHelper.stopAndShutdownServices(camelCtx, endpoint, consumer);
        }
        catch (Exception e) {
            throw new IgniteException("Failed to stop Camel streamer [errMsg=" + e.getMessage() + ']');
        }

        U.log(log, "Stopped Camel streamer, formerly consuming from endpoint URI: " + endpointUri);
    }

    /**
     * Processes the incoming {@link Exchange} and adds the tuple(s) to the underlying streamer.
     *
     * @param exchange The Camel Exchange.
     */
    @Override public void process(Exchange exchange) throws Exception {
        // Extract and insert the tuple(s).
        if (getMultipleTupleExtractor() == null) {
            Map.Entry<K, V> entry = getSingleTupleExtractor().extract(exchange);
            getStreamer().addData(entry);
        }
        else {
            Map<K, V> entries = getMultipleTupleExtractor().extract(exchange);
            getStreamer().addData(entries);
        }

        // If the user has set a response processor, invoke it before finishing.
        if (resProc != null)
            resProc.process(exchange);
    }

    /**
     * Gets the underlying {@link CamelContext}, whether created automatically by Ignite or the context specified by the
     * user.
     *
     * @return The Camel Context.
     */
    public CamelContext getCamelContext() {
        return camelCtx;
    }

    /**
     * Explicitly sets the {@link CamelContext} to use.
     *
     * Doing so gives the user the opportunity to attach custom components, a {@link
     * org.apache.camel.component.properties.PropertiesComponent}, set tracers, management strategies, etc.
     *
     * @param camelCtx The Camel Context to use. In most cases, an instance of {@link DefaultCamelContext}.
     */
    public void setCamelContext(CamelContext camelCtx) {
        this.camelCtx = camelCtx;
    }

    /**
     * Gets the endpoint URI from which to consume.
     *
     * @return The endpoint URI.
     */
    public String getEndpointUri() {
        return endpointUri;
    }

    /**
     * Sets the endpoint URI from which to consume. <b>Mandatory.</b>
     *
     * @param endpointUri The endpoint URI.
     */
    public void setEndpointUri(String endpointUri) {
        this.endpointUri = endpointUri;
    }

    /**
     * Gets the {@link Processor} used to generate the response.
     *
     * @return The {@link Processor}.
     */
    public Processor getResponseProcessor() {
        return resProc;
    }

    /**
     * Sets the {@link Processor} used to generate the response.
     *
     * @param resProc The {@link Processor}.
     */
    public void setResponseProcessor(Processor resProc) {
        this.resProc = resProc;
    }
}
