/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*   http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.protocol.v1_0;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoder;
import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoder;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.Section;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpSequence;
import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValue;
import org.apache.qpid.server.protocol.v1_0.type.messaging.ApplicationProperties;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Data;
import org.apache.qpid.server.protocol.v1_0.type.messaging.DeliveryAnnotations;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Footer;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Header;
import org.apache.qpid.server.protocol.v1_0.type.messaging.MessageAnnotations;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Properties;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.plugin.MessageMetaDataType;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;

public class MessageMetaData_1_0 implements StorableMessageMetaData
{
    private static final Logger _logger = LoggerFactory.getLogger(MessageMetaData_1_0.class);
    // TODO move to somewhere more useful
    public static final Symbol JMS_TYPE = Symbol.valueOf("x-opt-jms-type");
    public static final MessageMetaDataType.Factory<MessageMetaData_1_0> FACTORY = new MetaDataFactory();
    private static final MessageMetaDataType_1_0 TYPE = new MessageMetaDataType_1_0();


    private Header _header;
    private Properties _properties;
    private Map _deliveryAnnotations;
    private Map _messageAnnotations;
    private Map _appProperties;
    private Map _footer;

    private volatile List<QpidByteBuffer> _encodedSections = new ArrayList<>(3);

    private volatile QpidByteBuffer _encoded;
    private MessageHeader_1_0 _messageHeader;


    public MessageMetaData_1_0(List<Section> sections, SectionEncoder encoder)
    {
        this(sections, encodeSections(sections, encoder));
    }

    public Properties getPropertiesSection()
    {
        return _properties;
    }


    public Header getHeaderSection()
    {
        return _header;
    }

    private static ArrayList<QpidByteBuffer> encodeSections(final List<Section> sections, final SectionEncoder encoder)
    {
        ArrayList<QpidByteBuffer> encodedSections = new ArrayList<QpidByteBuffer>(sections.size());
        for(Section section : sections)
        {
            encoder.encodeObject(section);
            encodedSections.add(QpidByteBuffer.wrap(encoder.getEncoding().asByteBuffer()));
            encoder.reset();
        }
        return encodedSections;
    }

    public MessageMetaData_1_0(QpidByteBuffer[] fragments, SectionDecoder decoder)
    {
        this(fragments, decoder, new ArrayList<QpidByteBuffer>(3));
    }

    public MessageMetaData_1_0(QpidByteBuffer[] fragments, SectionDecoder decoder, List<QpidByteBuffer> immutableSections)
    {
        this(constructSections(fragments, decoder,immutableSections), immutableSections);
    }

    private MessageMetaData_1_0(List<Section> sections, List<QpidByteBuffer> encodedSections)
    {
        _encodedSections = encodedSections;

        Iterator<Section> sectIter = sections.iterator();

        Section section = sectIter.hasNext() ? sectIter.next() : null;
        if(section instanceof Header)
        {
            _header = (Header) section;
            section = sectIter.hasNext() ? sectIter.next() : null;
        }

        if(section instanceof DeliveryAnnotations)
        {
            _deliveryAnnotations = ((DeliveryAnnotations) section).getValue();
            section = sectIter.hasNext() ? sectIter.next() : null;
        }

        if(section instanceof MessageAnnotations)
        {
            _messageAnnotations = ((MessageAnnotations) section).getValue();
            section = sectIter.hasNext() ? sectIter.next() : null;
        }

        if(section instanceof Properties)
        {
            _properties = (Properties) section;
            section = sectIter.hasNext() ? sectIter.next() : null;
        }

        if(section instanceof ApplicationProperties)
        {
            _appProperties = ((ApplicationProperties) section).getValue();
            section = sectIter.hasNext() ? sectIter.next() : null;
        }

        if(section instanceof Footer)
        {
            _footer = ((Footer) section).getValue();
            section = sectIter.hasNext() ? sectIter.next() : null;
        }

        _messageHeader = new MessageHeader_1_0();

    }

    private static List<Section> constructSections(final QpidByteBuffer[] fragments, final SectionDecoder decoder, List<QpidByteBuffer> encodedSections)
    {
        List<Section> sections = new ArrayList<Section>(3);

        QpidByteBuffer src;
        if(fragments.length == 1)
        {
            src = fragments[0].duplicate();
        }
        else
        {
            int size = 0;
            for(QpidByteBuffer buf : fragments)
            {
                size += buf.remaining();
            }
            src = QpidByteBuffer.allocateDirect(size);
            for(QpidByteBuffer buf : fragments)
            {
                QpidByteBuffer duplicate = buf.duplicate();
                src.put(duplicate);
                duplicate.dispose();
            }
            src.flip();

        }

        try
        {
            int startBarePos = -1;
            int lastPos = src.position();
            Section s = decoder.readSection(src);



            if(s instanceof Header)
            {
                sections.add(s);
                lastPos = src.position();
                s = src.hasRemaining() ? decoder.readSection(src) : null;
            }

            if(s instanceof DeliveryAnnotations)
            {
                sections.add(s);
                lastPos = src.position();
                s = src.hasRemaining() ? decoder.readSection(src) : null;
            }

            if(s instanceof MessageAnnotations)
            {
                sections.add(s);
                lastPos = src.position();
                s = src.hasRemaining() ? decoder.readSection(src) : null;
            }

            if(s instanceof Properties)
            {
                sections.add(s);
                if(startBarePos == -1)
                {
                    startBarePos = lastPos;
                }
                s = src.hasRemaining() ? decoder.readSection(src) : null;
            }

            if(s instanceof ApplicationProperties)
            {
                sections.add(s);
                if(startBarePos == -1)
                {
                    startBarePos = lastPos;
                }
                s = src.hasRemaining() ? decoder.readSection(src) : null;
            }

            if(s instanceof AmqpValue)
            {
                if(startBarePos == -1)
                {
                    startBarePos = lastPos;
                }
                s = src.hasRemaining() ? decoder.readSection(src) : null;
            }
            else if(s instanceof Data)
            {
                if(startBarePos == -1)
                {
                    startBarePos = lastPos;
                }
                do
                {
                    s = src.hasRemaining() ? decoder.readSection(src) : null;
                } while(s instanceof Data);
            }
            else if(s instanceof AmqpSequence)
            {
                if(startBarePos == -1)
                {
                    startBarePos = lastPos;
                }
                do
                {
                    s = src.hasRemaining() ? decoder.readSection(src) : null;
                }
                while(s instanceof AmqpSequence);
            }

            if(s instanceof Footer)
            {
                sections.add(s);
            }


            for(QpidByteBuffer buf : fragments)
            {
                encodedSections.add(buf.duplicate());
            }

            return sections;
        }
        catch (AmqpErrorException e)
        {
            _logger.error("Decoding read section error", e);
            throw new IllegalArgumentException(e);
        }
        finally
        {
            src.dispose();
        }
    }


    public MessageMetaDataType getType()
    {
        return TYPE;
    }


    public int getStorableSize()
    {
        int size = 0;

        for(QpidByteBuffer bin : _encodedSections)
        {
            size += bin.limit();
        }

        return size;
    }

    private QpidByteBuffer encodeAsBuffer()
    {
        QpidByteBuffer buf = QpidByteBuffer.allocateDirect(getStorableSize());

        for(QpidByteBuffer bin : _encodedSections)
        {
            QpidByteBuffer duplicate = bin.duplicate();
            buf.put(duplicate);
            duplicate.dispose();
        }
        buf.flip();

        return buf;
    }

    public int writeToBuffer(QpidByteBuffer dest)
    {
        QpidByteBuffer buf = _encoded;

        if(buf == null)
        {
            buf = encodeAsBuffer();
            _encoded = buf;
        }

        buf = buf.duplicate();

        buf.position(0);

        if(dest.remaining() < buf.limit())
        {
            buf.limit(dest.remaining());
        }
        final int length = buf.limit();
        dest.putCopyOf(buf);
        buf.dispose();
        return length;
    }

    public int getContentSize()
    {
        QpidByteBuffer buf = _encoded;

        if(buf == null)
        {
            buf = encodeAsBuffer();
            _encoded = buf;
        }
        return buf.remaining();
    }

    public boolean isPersistent()
    {
        return _header != null && Boolean.TRUE.equals(_header.getDurable());
    }

    public MessageHeader_1_0 getMessageHeader()
    {
        return _messageHeader;
    }

    @Override
    public void dispose()
    {
        for(QpidByteBuffer bin : _encodedSections)
        {
            bin.dispose();
        }
        _encodedSections = null;
        _encoded.dispose();
        _encoded = null;
    }

    @Override
    public void clearEncodedForm()
    {

    }

    private static class MetaDataFactory implements MessageMetaDataType.Factory<MessageMetaData_1_0>
    {
        private final AMQPDescribedTypeRegistry _typeRegistry = AMQPDescribedTypeRegistry.newInstance();

        private MetaDataFactory()
        {
            _typeRegistry.registerTransportLayer();
            _typeRegistry.registerMessagingLayer();
            _typeRegistry.registerTransactionLayer();
            _typeRegistry.registerSecurityLayer();
        }

        public MessageMetaData_1_0 createMetaData(QpidByteBuffer buf)
        {
            ValueHandler valueHandler = new ValueHandler(_typeRegistry);

            ArrayList<Section> sections = new ArrayList<Section>(3);
            ArrayList<QpidByteBuffer> encodedSections = new ArrayList<>(3);

            while(buf.hasRemaining())
            {
                try
                {
                    int start = buf.position();
                    QpidByteBuffer encodedBuf = buf.slice();
                    Object parse = valueHandler.parse(buf);
                    sections.add((Section) parse);
                    encodedBuf.limit(buf.position()-start);
                    encodedSections.add(encodedBuf);

                }
                catch (AmqpErrorException e)
                {
                    //TODO
                    throw new ConnectionScopedRuntimeException(e);
                }

            }

            return new MessageMetaData_1_0(sections,encodedSections);

        }
    }

    public class MessageHeader_1_0 implements AMQMessageHeader
    {

        public String getCorrelationId()
        {
            if(_properties == null || _properties.getCorrelationId() == null)
            {
                return null;
            }
            else
            {
                return _properties.getCorrelationId().toString();
            }
        }

        public long getExpiration()
        {
            return 0;  //TODO
        }

        public String getMessageId()
        {
            if(_properties == null || _properties.getMessageId() == null)
            {
                return null;
            }
            else
            {
                return _properties.getMessageId().toString();
            }
        }

        public String getMimeType()
        {

            if(_properties == null || _properties.getContentType() == null)
            {
                return null;
            }
            else
            {
                return _properties.getContentType().toString();
            }
        }

        public String getEncoding()
        {
            return null;  //TODO
        }

        public byte getPriority()
        {
            if(_header == null || _header.getPriority() == null)
            {
                return 4; //javax.jms.Message.DEFAULT_PRIORITY;
            }
            else
            {
                return _header.getPriority().byteValue();
            }
        }

        public long getTimestamp()
        {
            if(_properties == null || _properties.getCreationTime() == null)
            {
                return 0L;
            }
            else
            {
                return _properties.getCreationTime().getTime();
            }

        }


        @Override
        public long getNotValidBefore()
        {
            long notValidBefore;
            Object annotation;
            if(_messageAnnotations != null && (annotation = _messageAnnotations.get(Symbol.valueOf("x-qpid-not-valid-before"))) instanceof Number)
            {
                notValidBefore = ((Number)annotation).longValue();
            }
            else
            {
                notValidBefore = 0L;
            }
            return notValidBefore;
        }

        public String getType()
        {
            String subject = getSubject();
            if(subject != null)
            {
                return subject;
            }

            // Use legacy annotation if present and there was no subject
            if(_messageAnnotations == null || _messageAnnotations.get(JMS_TYPE) == null)
            {
                return null;
            }
            else
            {
                return _messageAnnotations.get(JMS_TYPE).toString();
            }
        }

        public String getReplyTo()
        {
            if(_properties == null || _properties.getReplyTo() == null)
            {
                return null;
            }
            else
            {
                return _properties.getReplyTo();
            }
        }

        public String getAppId()
        {
            //TODO
            return null;
        }

        public String getUserId()
        {
            // TODO
            return null;
        }

        public Object getHeader(final String name)
        {
            return _appProperties == null ? null : _appProperties.get(name);
        }

        public boolean containsHeaders(final Set<String> names)
        {
            if(_appProperties == null)
            {
                return false;
            }

            for(String key : names)
            {
                if(!_appProperties.containsKey(key))
                {
                    return false;
                }
            }
            return true;
        }

        @Override
        public Collection<String> getHeaderNames()
        {
            if(_appProperties == null)
            {
                return Collections.emptySet();
            }
            return Collections.unmodifiableCollection(_appProperties.keySet());
        }

        public boolean containsHeader(final String name)
        {
            return _appProperties != null && _appProperties.containsKey(name);
        }

        public String getSubject()
        {
            return _properties == null ? null : _properties.getSubject();
        }

        public String getTo()
        {
            return _properties == null ? null : _properties.getTo();
        }

        public Map<String, Object> getHeadersAsMap()
        {
            return _appProperties == null ? new HashMap<String,Object>() : new HashMap<String,Object>(_appProperties);
        }
    }

}
