/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.nifi.processors.standard;

import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processors.standard.util.JmsFactory;
import org.apache.nifi.processors.standard.util.JmsProcessingSummary;
import org.apache.nifi.processors.standard.util.WrappedMessageConsumer;
import org.apache.nifi.util.StopWatch;

import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.nifi.processors.standard.util.JmsProperties.ACKNOWLEDGEMENT_MODE;
import static org.apache.nifi.processors.standard.util.JmsProperties.ACK_MODE_CLIENT;
import static org.apache.nifi.processors.standard.util.JmsProperties.BATCH_SIZE;
import static org.apache.nifi.processors.standard.util.JmsProperties.CLIENT_ID_PREFIX;
import static org.apache.nifi.processors.standard.util.JmsProperties.DESTINATION_NAME;
import static org.apache.nifi.processors.standard.util.JmsProperties.JMS_PROPS_TO_ATTRIBUTES;
import static org.apache.nifi.processors.standard.util.JmsProperties.JMS_PROVIDER;
import static org.apache.nifi.processors.standard.util.JmsProperties.MESSAGE_SELECTOR;
import static org.apache.nifi.processors.standard.util.JmsProperties.PASSWORD;
import static org.apache.nifi.processors.standard.util.JmsProperties.SSL_CONTEXT_SERVICE;
import static org.apache.nifi.processors.standard.util.JmsProperties.TIMEOUT;
import static org.apache.nifi.processors.standard.util.JmsProperties.URL;
import static org.apache.nifi.processors.standard.util.JmsProperties.USERNAME;

public abstract class JmsConsumer extends AbstractProcessor {

    public static final String MAP_MESSAGE_PREFIX = "jms.mapmessage.";

    public static final Relationship REL_SUCCESS = new Relationship.Builder()
            .name("success")
            .description("All FlowFiles are routed to success")
            .build();

    private final Set<Relationship> relationships;
    private final List<PropertyDescriptor> propertyDescriptors;

    public JmsConsumer() {
        final Set<Relationship> rels = new HashSet<>();
        rels.add(REL_SUCCESS);
        this.relationships = Collections.unmodifiableSet(rels);

        final List<PropertyDescriptor> descriptors = new ArrayList<>();
        descriptors.add(JMS_PROVIDER);
        descriptors.add(URL);
        descriptors.add(DESTINATION_NAME);
        descriptors.add(TIMEOUT);
        descriptors.add(BATCH_SIZE);
        descriptors.add(USERNAME);
        descriptors.add(PASSWORD);
        descriptors.add(SSL_CONTEXT_SERVICE);
        descriptors.add(ACKNOWLEDGEMENT_MODE);
        descriptors.add(MESSAGE_SELECTOR);
        descriptors.add(JMS_PROPS_TO_ATTRIBUTES);
        descriptors.add(CLIENT_ID_PREFIX);
        this.propertyDescriptors = Collections.unmodifiableList(descriptors);
    }

    @Override
    public Set<Relationship> getRelationships() {
        return relationships;
    }

    @Override
    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return propertyDescriptors;
    }

    public void consume(final ProcessContext context, final ProcessSession session, final WrappedMessageConsumer wrappedConsumer) throws ProcessException {
        final ComponentLog logger = getLogger();

        final MessageConsumer consumer = wrappedConsumer.getConsumer();
        final boolean clientAcknowledge = context.getProperty(ACKNOWLEDGEMENT_MODE).getValue().equalsIgnoreCase(ACK_MODE_CLIENT);
        final long timeout = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
        final boolean addAttributes = context.getProperty(JMS_PROPS_TO_ATTRIBUTES).asBoolean();
        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();

        final JmsProcessingSummary processingSummary = new JmsProcessingSummary();

        final StopWatch stopWatch = new StopWatch(true);
        for (int i = 0; i < batchSize; i++) {

            final Message message;
            try {
                // If we haven't received a message, wait until one is available. If we have already received at least one
                // message, then we are not willing to wait for more to become available, but we are willing to keep receiving
                // all messages that are immediately available.
                if (processingSummary.getMessagesReceived() == 0) {
                    message = consumer.receive(timeout);
                } else {
                    message = consumer.receiveNoWait();
                }
            } catch (final JMSException e) {
                logger.error("Failed to receive JMS Message due to {}", e);
                wrappedConsumer.close(logger);
                break;
            }

            if (message == null) { // if no messages, we're done
                break;
            }

            try {
                processingSummary.add(map2FlowFile(context, session, message, addAttributes, logger));
            } catch (Exception e) {
                logger.error("Failed to receive JMS Message due to {}", e);
                wrappedConsumer.close(logger);
                break;
            }
        }

        if (processingSummary.getFlowFilesCreated() == 0) {
            context.yield();
            return;
        }

        session.commitAsync(() -> {
            // if we need to acknowledge the messages, do so now.
            final Message lastMessage = processingSummary.getLastMessageReceived();
            if (clientAcknowledge && lastMessage != null) {
                try {
                    lastMessage.acknowledge();  // acknowledge all received messages by acknowledging only the last.
                } catch (final JMSException e) {
                    logger.error("Failed to acknowledge {} JMS Message(s). This may result in duplicate messages. Reason for failure: {}",
                        new Object[]{processingSummary.getMessagesReceived(), e});
                }
            }
        });

        stopWatch.stop();
        if (processingSummary.getFlowFilesCreated() > 0) {
            final float secs = (stopWatch.getDuration(TimeUnit.MILLISECONDS) / 1000F);
            float messagesPerSec = (processingSummary.getMessagesReceived()) / secs;
            final String dataRate = stopWatch.calculateDataRate(processingSummary.getBytesReceived());
            logger.info("Received {} messages in {} milliseconds, at a rate of {} messages/sec or {}",
                    new Object[]{processingSummary.getMessagesReceived(), stopWatch.getDuration(TimeUnit.MILLISECONDS), messagesPerSec, dataRate});
        }
    }

    public static JmsProcessingSummary map2FlowFile(final ProcessContext context, final ProcessSession session, final Message message, final boolean addAttributes, ComponentLog logger)
            throws Exception {

        // Currently not very useful, because always one Message == one FlowFile
        final AtomicInteger msgsThisFlowFile = new AtomicInteger(1);

        FlowFile flowFile = session.create();
        try {
            // MapMessage is exception, add only name-value pairs to FlowFile attributes
            if (message instanceof MapMessage) {
                MapMessage mapMessage = (MapMessage) message;
                flowFile = session.putAllAttributes(flowFile, createMapMessageValues(mapMessage));
            } else { // all other message types, write Message body to FlowFile content
                flowFile = session.write(flowFile, new OutputStreamCallback() {
                    @Override
                    public void process(final OutputStream rawOut) throws IOException {
                        try (final OutputStream out = new BufferedOutputStream(rawOut, 65536)) {
                            final byte[] messageBody = JmsFactory.createByteArray(message);
                            out.write(messageBody);
                        } catch (final JMSException e) {
                            throw new ProcessException("Failed to receive JMS Message due to " + e.getMessage(), e);
                        }
                    }
                });
            }

            if (addAttributes) {
                flowFile = session.putAllAttributes(flowFile, JmsFactory.createAttributeMap(message));
            }

            session.getProvenanceReporter().receive(flowFile, context.getProperty(URL).getValue());
            session.transfer(flowFile, REL_SUCCESS);
            logger.info("Created {} from {} messages received from JMS Server and transferred to 'success'",
                    new Object[]{flowFile, msgsThisFlowFile.get()});

            return new JmsProcessingSummary(flowFile.getSize(), message, flowFile);

        } catch (Exception e) {
            session.remove(flowFile);
            throw e;
        }
    }

    public static Map<String, String> createMapMessageValues(final MapMessage mapMessage) throws JMSException {
        final Map<String, String> valueMap = new HashMap<>();

        final Enumeration<?> enumeration = mapMessage.getMapNames();
        while (enumeration.hasMoreElements()) {
            final String name = (String) enumeration.nextElement();

            final Object value = mapMessage.getObject(name);
            if (value == null) {
                valueMap.put(MAP_MESSAGE_PREFIX + name, "");
            } else {
                valueMap.put(MAP_MESSAGE_PREFIX + name, value.toString());
            }
        }

        return valueMap;
    }

}
