/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.nifi.xml;

import javanet.staxutils.IndentingXMLStreamWriter;
import org.apache.nifi.record.NullSuppression;
import org.apache.nifi.schema.access.SchemaAccessWriter;
import org.apache.nifi.serialization.AbstractRecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.RawRecordWriter;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.ChoiceDataType;
import org.apache.nifi.serialization.record.type.MapDataType;
import org.apache.nifi.serialization.record.type.RecordDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;

import javax.xml.stream.XMLOutputFactory;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamWriter;
import java.io.IOException;
import java.io.OutputStream;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import java.util.regex.Pattern;

import static org.apache.nifi.xml.XMLRecordSetWriter.RECORD_TAG_NAME;
import static org.apache.nifi.xml.XMLRecordSetWriter.ROOT_TAG_NAME;


public class WriteXMLResult extends AbstractRecordSetWriter implements RecordSetWriter, RawRecordWriter {
    private static final Pattern TAG_NAME_CHARS_TO_STRIP = Pattern.compile("[/<>!&'\"]");

    private final RecordSchema recordSchema;
    private final SchemaAccessWriter schemaAccess;
    private final XMLStreamWriter writer;
    private final NullSuppression nullSuppression;
    private final boolean omitDeclaration;
    private final ArrayWrapping arrayWrapping;
    private final String arrayTagName;
    private final String recordTagName;
    private final String rootTagName;
    private final boolean allowWritingMultipleRecords;
    private boolean hasWrittenRecord;

    private final Supplier<DateFormat> LAZY_DATE_FORMAT;
    private final Supplier<DateFormat> LAZY_TIME_FORMAT;
    private final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT;

    public WriteXMLResult(final RecordSchema recordSchema, final SchemaAccessWriter schemaAccess, final OutputStream out, final boolean prettyPrint, final boolean omitDeclaration,
                          final NullSuppression nullSuppression, final ArrayWrapping arrayWrapping, final String arrayTagName, final String rootTagName, final String recordTagName,
                          final String charSet, final String dateFormat, final String timeFormat, final String timestampFormat) throws IOException {

        super(out);

        this.recordSchema = recordSchema;
        this.schemaAccess = schemaAccess;
        this.nullSuppression = nullSuppression;

        this.omitDeclaration = omitDeclaration;

        this.arrayWrapping = arrayWrapping;
        this.arrayTagName = arrayTagName;

        this.rootTagName = rootTagName;

        if (recordTagName != null) {
            this.recordTagName = recordTagName;
        } else {
            Optional<String> recordTagNameOptional = recordSchema.getSchemaName().isPresent()? recordSchema.getSchemaName() : recordSchema.getIdentifier().getName();
            if (recordTagNameOptional.isPresent()) {
                this.recordTagName = recordTagNameOptional.get();
            } else {
                final String message = "The property '" + RECORD_TAG_NAME.getDisplayName() +
                    "' has not been set and the writer does not find a record name in the schema.";
                throw new IOException(message);
            }
        }

        this.allowWritingMultipleRecords = !(this.rootTagName == null);
        hasWrittenRecord = false;

        // Use DateFormat with default TimeZone to avoid unexpected conversion of year-month-day
        final DateFormat df = dateFormat == null ? null : new SimpleDateFormat(dateFormat);
        final DateFormat tf = timeFormat == null ? null : DataTypeUtils.getDateFormat(timeFormat);
        final DateFormat tsf = timestampFormat == null ? null : DataTypeUtils.getDateFormat(timestampFormat);

        LAZY_DATE_FORMAT = () -> df;
        LAZY_TIME_FORMAT = () -> tf;
        LAZY_TIMESTAMP_FORMAT = () -> tsf;

        try {
            XMLOutputFactory factory = XMLOutputFactory.newInstance();

            if (prettyPrint) {
                writer = new IndentingXMLStreamWriter(factory.createXMLStreamWriter(out, charSet));
            } else {
                writer = factory.createXMLStreamWriter(out, charSet);
            }

        } catch (XMLStreamException e) {
            throw new IOException(e.getMessage());
        }
    }

    @Override
    protected void onBeginRecordSet() throws IOException {

        final OutputStream out = getOutputStream();
        schemaAccess.writeHeader(recordSchema, out);

        try {
            if (!omitDeclaration) {
                writer.writeStartDocument();
            }

            if (allowWritingMultipleRecords) {
                writer.writeStartElement(rootTagName);
            }

        } catch (XMLStreamException e) {
            throw new IOException(e.getMessage());
        }
    }

    @Override
    protected Map<String, String> onFinishRecordSet() throws IOException {

        try {
            if (allowWritingMultipleRecords) {
                writer.writeEndElement();
            }

            writer.writeEndDocument();
        } catch (XMLStreamException e) {
            throw new IOException(e.getMessage());
        }
        return schemaAccess.getAttributes(recordSchema);
    }

    @Override
    public void close() throws IOException {

        try {
            writer.close();

        } catch (XMLStreamException e) {
            throw new IOException(e.getMessage());
        }

        super.close();
    }

    @Override
    public void flush() throws IOException {

        try {
            writer.flush();

        } catch (XMLStreamException e) {
            throw new IOException(e.getMessage());
        }
    }

    private void checkWritingMultipleRecords() throws IOException {
        if (!allowWritingMultipleRecords && hasWrittenRecord) {
            final String message = "The writer attempts to write multiple record although property \'" + ROOT_TAG_NAME.getDisplayName() +
                "\' has not been set. If the XMLRecordSetWriter is supposed to write multiple records into one FlowFile, this property is required to be configured.";
            throw new IOException(message);
        }
    }

    @Override
    protected Map<String, String> writeRecord(Record record) throws IOException {

        if (!isActiveRecordSet()) {
            schemaAccess.writeHeader(recordSchema, getOutputStream());
        }

        checkWritingMultipleRecords();

        Deque<String> tagsToOpen = new ArrayDeque<>();

        try {
            tagsToOpen.addLast(recordTagName);

            boolean closingTagRequired = iterateThroughRecordUsingSchema(tagsToOpen, record, recordSchema);
            if (closingTagRequired) {
                writer.writeEndElement();
                hasWrittenRecord = true;
            }

        } catch (XMLStreamException e) {
            throw new IOException(e.getMessage());
        }
        return schemaAccess.getAttributes(recordSchema);
    }

    private boolean iterateThroughRecordUsingSchema(Deque<String> tagsToOpen, Record record, RecordSchema schema) throws XMLStreamException {

        boolean loopHasWritten = false;
        for (RecordField field : schema.getFields()) {

            String fieldName = field.getFieldName();
            DataType dataType = field.getDataType();
            Object value = record.getValue(field);

            final DataType chosenDataType = dataType.getFieldType() == RecordFieldType.CHOICE ? DataTypeUtils.chooseDataType(value, (ChoiceDataType) dataType) : dataType;
            final Object coercedValue = DataTypeUtils.convertType(value, chosenDataType, LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, fieldName);

            if (coercedValue != null) {
                boolean hasWritten = writeFieldForType(tagsToOpen, coercedValue, chosenDataType, fieldName);
                if (hasWritten) {
                    loopHasWritten = true;
                }

            } else {
                if (nullSuppression.equals(NullSuppression.NEVER_SUPPRESS) || nullSuppression.equals(NullSuppression.SUPPRESS_MISSING) && recordHasField(field, record)) {
                    writeAllTags(tagsToOpen, fieldName);
                    writer.writeEndElement();
                    loopHasWritten = true;
                }
            }
        }

        return loopHasWritten;
    }

    private boolean writeFieldForType(Deque<String> tagsToOpen, Object coercedValue, DataType dataType, String fieldName) throws XMLStreamException {
        switch (dataType.getFieldType()) {
            case BOOLEAN:
            case BYTE:
            case CHAR:
            case DECIMAL:
            case DOUBLE:
            case FLOAT:
            case INT:
            case LONG:
            case SHORT:
            case STRING: {
                writeAllTags(tagsToOpen, fieldName);
                writer.writeCharacters(coercedValue.toString());
                writer.writeEndElement();
                return true;
            }
            case DATE: {
                writeAllTags(tagsToOpen, fieldName);
                final String stringValue = DataTypeUtils.toString(coercedValue, LAZY_DATE_FORMAT);
                writer.writeCharacters(stringValue);
                writer.writeEndElement();
                return true;
            }
            case TIME: {
                writeAllTags(tagsToOpen, fieldName);
                final String stringValue = DataTypeUtils.toString(coercedValue, LAZY_TIME_FORMAT);
                writer.writeCharacters(stringValue);
                writer.writeEndElement();
                return true;
            }
            case TIMESTAMP: {
                writeAllTags(tagsToOpen, fieldName);
                final String stringValue = DataTypeUtils.toString(coercedValue, LAZY_TIMESTAMP_FORMAT);
                writer.writeCharacters(stringValue);
                writer.writeEndElement();
                return true;
            }
            case RECORD: {
                final Record record = (Record) coercedValue;
                final RecordDataType recordDataType = (RecordDataType) dataType;
                final RecordSchema childSchema = recordDataType.getChildSchema();
                tagsToOpen.addLast(fieldName);

                boolean hasWritten = iterateThroughRecordUsingSchema(tagsToOpen, record, childSchema);

                if (hasWritten) {
                    writer.writeEndElement();
                    return true;
                } else {

                    if (nullSuppression.equals(NullSuppression.NEVER_SUPPRESS) || nullSuppression.equals(NullSuppression.SUPPRESS_MISSING)) {
                        writeAllTags(tagsToOpen);
                        writer.writeEndElement();
                        return true;
                    } else {
                        tagsToOpen.removeLast();
                        return false;
                    }
                }
            }
            case ARRAY: {
                final Object[] arrayValues;
                if (coercedValue instanceof Object[]) {
                    arrayValues = (Object[]) coercedValue;
                } else {
                    arrayValues = new Object[]{coercedValue.toString()};
                }

                final ArrayDataType arrayDataType = (ArrayDataType) dataType;
                final DataType elementType = arrayDataType.getElementType();

                final String elementName;
                final String wrapperName;
                if (arrayWrapping.equals(ArrayWrapping.USE_PROPERTY_FOR_ELEMENTS)) {
                    elementName = arrayTagName;
                    wrapperName = fieldName;
                } else if (arrayWrapping.equals(ArrayWrapping.USE_PROPERTY_AS_WRAPPER)) {
                    elementName = fieldName;
                    wrapperName = arrayTagName;
                } else {
                    elementName = fieldName;
                    wrapperName = null;
                }

                if (wrapperName!= null) {
                    tagsToOpen.addLast(wrapperName);
                }

                boolean loopHasWritten = false;
                for (Object element : arrayValues) {

                    final DataType chosenDataType = elementType.getFieldType() == RecordFieldType.CHOICE ? DataTypeUtils.chooseDataType(element, (ChoiceDataType) elementType) : elementType;
                    final Object coercedElement = DataTypeUtils.convertType(element, chosenDataType, LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, elementName);

                    if (coercedElement != null) {
                        boolean hasWritten = writeFieldForType(tagsToOpen, coercedElement, elementType, elementName);

                        if (hasWritten) {
                            loopHasWritten = true;
                        }

                    } else {
                        if (nullSuppression.equals(NullSuppression.NEVER_SUPPRESS) || nullSuppression.equals(NullSuppression.SUPPRESS_MISSING)) {
                            writeAllTags(tagsToOpen, fieldName);
                            writer.writeEndElement();
                            loopHasWritten = true;
                        }
                    }
                }

                if (wrapperName!= null) {
                    if (loopHasWritten) {
                        writer.writeEndElement();
                        return true;
                    } else {
                        if (nullSuppression.equals(NullSuppression.NEVER_SUPPRESS) || nullSuppression.equals(NullSuppression.SUPPRESS_MISSING)) {
                            writeAllTags(tagsToOpen);
                            writer.writeEndElement();
                            return true;
                        } else {
                            tagsToOpen.removeLast();
                            return false;
                        }
                    }
                } else {
                    return loopHasWritten;
                }
            }
            case MAP: {
                final MapDataType mapDataType = (MapDataType) dataType;
                final DataType valueDataType = mapDataType.getValueType();
                final Map<String,?> map = (Map<String,?>) coercedValue;

                tagsToOpen.addLast(fieldName);
                boolean loopHasWritten = false;

                for (Map.Entry<String,?> entry : map.entrySet()) {

                    final String key = entry.getKey();

                    final DataType chosenDataType = valueDataType.getFieldType() == RecordFieldType.CHOICE ? DataTypeUtils.chooseDataType(entry.getValue(),
                            (ChoiceDataType) valueDataType) : valueDataType;
                    final Object coercedElement = DataTypeUtils.convertType(entry.getValue(), chosenDataType, LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, key);

                    if (coercedElement != null) {
                        boolean hasWritten = writeFieldForType(tagsToOpen, entry.getValue(), valueDataType, key);

                        if (hasWritten) {
                            loopHasWritten = true;
                        }
                    } else {
                        if (nullSuppression.equals(NullSuppression.NEVER_SUPPRESS) || nullSuppression.equals(NullSuppression.SUPPRESS_MISSING)) {
                            writeAllTags(tagsToOpen, key);
                            writer.writeEndElement();
                            loopHasWritten = true;
                        }
                    }
                }

                if (loopHasWritten) {
                    writer.writeEndElement();
                    return true;
                } else {
                    if (nullSuppression.equals(NullSuppression.NEVER_SUPPRESS) || nullSuppression.equals(NullSuppression.SUPPRESS_MISSING)) {
                        writeAllTags(tagsToOpen);
                        writer.writeEndElement();
                        return true;
                    } else {
                        tagsToOpen.removeLast();
                        return false;
                    }
                }
            }
            case CHOICE:
            default: {
                return writeUnknownField(tagsToOpen, coercedValue, fieldName);
            }
        }
    }

    private void writeAllTags(Deque<String> tagsToOpen, String fieldName) throws XMLStreamException {
        tagsToOpen.addLast(fieldName);
        writeAllTags(tagsToOpen);
    }

    private String escapeTagName(final String tagName) {
        return TAG_NAME_CHARS_TO_STRIP.matcher(tagName).replaceAll("");
    }

    private void writeAllTags(Deque<String> tagsToOpen) throws XMLStreamException {
        for (String tagName : tagsToOpen) {
            writer.writeStartElement(escapeTagName(tagName));
        }
        tagsToOpen.clear();
    }

    @Override
    public WriteResult writeRawRecord(Record record) throws IOException {

        if (!isActiveRecordSet()) {
            schemaAccess.writeHeader(recordSchema, getOutputStream());
        }

        checkWritingMultipleRecords();

        Deque<String> tagsToOpen = new ArrayDeque<>();

        try {
            tagsToOpen.addLast(recordTagName);

            boolean closingTagRequired = iterateThroughRecordWithoutSchema(tagsToOpen, record);
            if (closingTagRequired) {
                writer.writeEndElement();
                hasWrittenRecord = true;
            }

        } catch (XMLStreamException e) {
            throw new IOException(e.getMessage());
        }

        final Map<String, String> attributes = schemaAccess.getAttributes(recordSchema);
        return WriteResult.of(incrementRecordCount(), attributes);
    }

    private boolean iterateThroughRecordWithoutSchema(Deque<String> tagsToOpen, Record record) throws XMLStreamException {

        boolean loopHasWritten = false;

        for (String fieldName : record.getRawFieldNames()) {
            Object value = record.getValue(fieldName);

            if (value != null) {
                boolean hasWritten = writeUnknownField(tagsToOpen, value, fieldName);

                if (hasWritten) {
                    loopHasWritten = true;
                }
            } else {
                if (nullSuppression.equals(NullSuppression.NEVER_SUPPRESS) || nullSuppression.equals(NullSuppression.SUPPRESS_MISSING)) {
                    writeAllTags(tagsToOpen, fieldName);
                    writer.writeEndElement();
                    loopHasWritten = true;
                }
            }
        }

        return loopHasWritten;
    }

    private boolean writeUnknownField(Deque<String> tagsToOpen, Object value, String fieldName) throws XMLStreamException {

        if (value instanceof Record) {
            Record valueAsRecord = (Record) value;
            tagsToOpen.addLast(fieldName);

            boolean hasWritten = iterateThroughRecordWithoutSchema(tagsToOpen, valueAsRecord);

            if (hasWritten) {
                writer.writeEndElement();
                return true;
            } else {
                if (nullSuppression.equals(NullSuppression.NEVER_SUPPRESS) || nullSuppression.equals(NullSuppression.SUPPRESS_MISSING)) {
                    writeAllTags(tagsToOpen);
                    writer.writeEndElement();
                    return true;
                } else {
                    tagsToOpen.removeLast();
                    return false;
                }
            }
        }

        if (value instanceof Object[]) {
            Object[] valueAsArray = (Object[]) value;

            final String elementName;
            final String wrapperName;
            if (arrayWrapping.equals(ArrayWrapping.USE_PROPERTY_FOR_ELEMENTS)) {
                elementName = arrayTagName;
                wrapperName = fieldName;
            } else if (arrayWrapping.equals(ArrayWrapping.USE_PROPERTY_AS_WRAPPER)) {
                elementName = fieldName;
                wrapperName = arrayTagName;
            } else {
                elementName = fieldName;
                wrapperName = null;
            }

            if (wrapperName!= null) {
                tagsToOpen.addLast(wrapperName);
            }

            boolean loopHasWritten = false;

            for (Object element : valueAsArray) {
                if (element != null) {
                    boolean hasWritten = writeUnknownField(tagsToOpen, element, elementName);

                    if (hasWritten) {
                        loopHasWritten = true;
                    }

                } else {
                    if (nullSuppression.equals(NullSuppression.NEVER_SUPPRESS) || nullSuppression.equals(NullSuppression.SUPPRESS_MISSING)) {
                        writeAllTags(tagsToOpen, fieldName);
                        writer.writeEndElement();
                        loopHasWritten = true;
                    }
                }
            }

            if (wrapperName!= null) {
                if (loopHasWritten) {
                    writer.writeEndElement();
                    return true;
                } else {
                    if (nullSuppression.equals(NullSuppression.NEVER_SUPPRESS) || nullSuppression.equals(NullSuppression.SUPPRESS_MISSING)) {
                        writeAllTags(tagsToOpen);
                        writer.writeEndElement();
                        return true;
                    } else {
                        tagsToOpen.removeLast();
                        return false;
                    }
                }
            } else {
                return loopHasWritten;
            }
        }

        if (value instanceof Map) {
            Map<String, ?> valueAsMap = (Map<String, ?>) value;

            tagsToOpen.addLast(fieldName);
            boolean loopHasWritten = false;

            for (Map.Entry<String,?> entry : valueAsMap.entrySet()) {

                final String key = entry.getKey();
                final Object entryValue = entry.getValue();

                if (entryValue != null) {
                    boolean hasWritten = writeUnknownField(tagsToOpen, entry.getValue(), key);

                    if (hasWritten) {
                        loopHasWritten = true;
                    }
                } else {
                    if (nullSuppression.equals(NullSuppression.NEVER_SUPPRESS) || nullSuppression.equals(NullSuppression.SUPPRESS_MISSING)) {
                        writeAllTags(tagsToOpen, key);
                        writer.writeEndElement();
                        loopHasWritten = true;
                    }
                }

            }

            if (loopHasWritten) {
                writer.writeEndElement();
                return true;
            } else {
                if (nullSuppression.equals(NullSuppression.NEVER_SUPPRESS) || nullSuppression.equals(NullSuppression.SUPPRESS_MISSING)) {
                    writeAllTags(tagsToOpen);
                    writer.writeEndElement();
                    return true;
                } else {
                    tagsToOpen.removeLast();
                    return false;
                }
            }
        }

        writeAllTags(tagsToOpen, fieldName);
        writer.writeCharacters(value.toString());
        writer.writeEndElement();
        return true;
    }


    @Override
    public String getMimeType() {
        return "application/xml";
    }

    private boolean recordHasField(RecordField field, Record record) {
        Set<String> recordFieldNames = record.getRawFieldNames();
        if (recordFieldNames.contains(field.getFieldName())) {
            return true;
        }

        for (String alias : field.getAliases()) {
            if (recordFieldNames.contains(alias)) {
                return true;
            }
        }

        return false;
    }
}
