/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.nifi.provenance;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.Relationship;

/**
 * Holder for provenance relevant information
 * <p/>
 * @author none
 */
public final class StandardProvenanceEventRecord implements ProvenanceEventRecord {

    private final long eventTime;
    private final long entryDate;
    private final ProvenanceEventType eventType;
    private final long lineageStartDate;
    private final Set<String> lineageIdentifiers;
    private final String componentId;
    private final String componentType;
    private final String transitUri;
    private final String sourceSystemFlowFileIdentifier;
    private final String uuid;
    private final List<String> parentUuids;
    private final List<String> childrenUuids;
    private final String alternateIdentifierUri;
    private final String details;
    private final String relationship;
    private final long storageByteOffset;
    private final String storageFilename;
    private final long eventDuration;

    private final String contentClaimSection;
    private final String contentClaimContainer;
    private final String contentClaimIdentifier;
    private final Long contentClaimOffset;
    private final long contentSize;

    private final String previousClaimSection;
    private final String previousClaimContainer;
    private final String previousClaimIdentifier;
    private final Long previousClaimOffset;
    private final Long previousSize;

    private final String sourceQueueIdentifier;

    private final Map<String, String> previousAttributes;
    private final Map<String, String> updatedAttributes;

    private volatile long eventId;

    private StandardProvenanceEventRecord(final Builder builder) {
        this.eventTime = builder.eventTime;
        this.entryDate = builder.entryDate;
        this.eventType = builder.eventType;
        this.componentId = builder.componentId;
        this.componentType = builder.componentType;
        this.transitUri = builder.transitUri;
        this.sourceSystemFlowFileIdentifier = builder.sourceSystemFlowFileIdentifier;
        this.uuid = builder.uuid;
        this.parentUuids = builder.parentUuids;
        this.childrenUuids = builder.childrenUuids;
        this.alternateIdentifierUri = builder.alternateIdentifierUri;
        this.details = builder.details;
        this.relationship = builder.relationship;
        this.storageByteOffset = builder.storageByteOffset;
        this.storageFilename = builder.storageFilename;
        this.eventDuration = builder.eventDuration;
        this.lineageStartDate = builder.lineageStartDate;
        this.lineageIdentifiers = Collections.unmodifiableSet(builder.lineageIdentifiers);

        previousClaimSection = builder.previousClaimSection;
        previousClaimContainer = builder.previousClaimContainer;
        previousClaimIdentifier = builder.previousClaimIdentifier;
        previousClaimOffset = builder.previousClaimOffset;
        previousSize = builder.previousSize;

        contentClaimSection = builder.contentClaimSection;
        contentClaimContainer = builder.contentClaimContainer;
        contentClaimIdentifier = builder.contentClaimIdentifier;
        contentClaimOffset = builder.contentClaimOffset;
        contentSize = builder.contentSize;

        previousAttributes = builder.previousAttributes == null ? Collections.<String, String>emptyMap() : Collections.unmodifiableMap(builder.previousAttributes);
        updatedAttributes = builder.updatedAttributes == null ? Collections.<String, String>emptyMap() : Collections.unmodifiableMap(builder.updatedAttributes);

        sourceQueueIdentifier = builder.sourceQueueIdentifier;

    }

    public String getStorageFilename() {
        return storageFilename;
    }

    public long getStorageByteOffset() {
        return storageByteOffset;
    }

    void setEventId(final long eventId) {
        this.eventId = eventId;
    }

    @Override
    public long getEventId() {
        return eventId;
    }

    @Override
    public long getEventTime() {
        return eventTime;
    }

    @Override
    public Set<String> getLineageIdentifiers() {
        return lineageIdentifiers;
    }

    @Override
    public long getLineageStartDate() {
        return lineageStartDate;
    }

    @Override
    public long getFileSize() {
        return contentSize;
    }

    @Override
    public Long getPreviousFileSize() {
        return previousSize;
    }

    @Override
    public ProvenanceEventType getEventType() {
        return eventType;
    }

    @Override
    public String getAttribute(final String attributeName) {
        if ( updatedAttributes.containsKey(attributeName) ) {
            return updatedAttributes.get(attributeName);
        }
        
        return previousAttributes.get(attributeName);
    }
    
    @Override
    public Map<String, String> getAttributes() {
        final Map<String, String> allAttrs = new HashMap<>(previousAttributes.size() + updatedAttributes.size());
        allAttrs.putAll(previousAttributes);
        for (final Map.Entry<String, String> entry : updatedAttributes.entrySet()) {
            if (entry.getValue() != null) {
                allAttrs.put(entry.getKey(), entry.getValue());
            }
        }
        return allAttrs;
    }

    @Override
    public String getComponentId() {
        return componentId;
    }

    @Override
    public String getComponentType() {
        return componentType;
    }

    @Override
    public String getTransitUri() {
        return transitUri;
    }

    @Override
    public String getSourceSystemFlowFileIdentifier() {
        return sourceSystemFlowFileIdentifier;
    }

    @Override
    public String getFlowFileUuid() {
        return uuid;
    }

    @Override
    public List<String> getParentUuids() {
        return parentUuids == null ? Collections.<String>emptyList() : parentUuids;
    }

    @Override
    public List<String> getChildUuids() {
        return childrenUuids == null ? Collections.<String>emptyList() : childrenUuids;
    }

    @Override
    public String getAlternateIdentifierUri() {
        return alternateIdentifierUri;
    }

    @Override
    public long getEventDuration() {
        return eventDuration;
    }

    @Override
    public String getDetails() {
        return details;
    }

    @Override
    public String getRelationship() {
        return relationship;
    }

    @Override
    public long getFlowFileEntryDate() {
        return entryDate;
    }

    @Override
    public String getContentClaimSection() {
        return contentClaimSection;
    }

    @Override
    public String getContentClaimContainer() {
        return contentClaimContainer;
    }

    @Override
    public String getContentClaimIdentifier() {
        return contentClaimIdentifier;
    }

    @Override
    public Long getContentClaimOffset() {
        return contentClaimOffset;
    }

    @Override
    public String getSourceQueueIdentifier() {
        return sourceQueueIdentifier;
    }

    @Override
    public Map<String, String> getPreviousAttributes() {
        return previousAttributes;
    }

    @Override
    public String getPreviousContentClaimContainer() {
        return previousClaimContainer;
    }

    @Override
    public String getPreviousContentClaimIdentifier() {
        return previousClaimIdentifier;
    }

    @Override
    public Long getPreviousContentClaimOffset() {
        return previousClaimOffset;
    }

    @Override
    public String getPreviousContentClaimSection() {
        return previousClaimSection;
    }

    @Override
    public Map<String, String> getUpdatedAttributes() {
        return updatedAttributes;
    }

    @Override
    public int hashCode() {
        final int eventTypeCode;
        if (eventType == ProvenanceEventType.CLONE || eventType == ProvenanceEventType.JOIN || eventType == ProvenanceEventType.FORK) {
            eventTypeCode = 1472;
        } else if (eventType == ProvenanceEventType.REPLAY) {
            eventTypeCode = 21479 + (int) (0x7FFFFFFF & eventTime); // use lower bits of event time.
        } else {
            eventTypeCode = 4812 + eventType.hashCode() + 4 * uuid.hashCode();
        }

        return -37423 + 3 * componentId.hashCode() + (transitUri == null ? 0 : 41 * transitUri.hashCode())
                + (relationship == null ? 0 : 47 * relationship.hashCode()) + 44 * eventTypeCode;
    }

    @Override
    public boolean equals(final Object obj) {
        if (obj == null) {
            return false;
        }
        if (obj == this) {
            return true;
        }
        if (!(obj instanceof StandardProvenanceEventRecord)) {
            return false;
        }

        final StandardProvenanceEventRecord other = (StandardProvenanceEventRecord) obj;
        // If event ID's are populated and not equal, return false. If they have not yet been populated, do not
        // use them in the comparison.
        if (eventId > 0L && other.getEventId() > 0L && eventId != other.getEventId()) {
            return false;
        }
        if (eventType != other.eventType) {
            return false;
        }

        if (!componentId.equals(other.componentId)) {
            return false;
        }

        if (different(parentUuids, other.parentUuids)) {
            return false;
        }

        if (different(childrenUuids, other.childrenUuids)) {
            return false;
        }

        // SPAWN had issues indicating which should be the event's FlowFileUUID in the case that there is 1 parent and 1 child.
        if (!uuid.equals(other.uuid)) {
            return false;
        }

        if (different(transitUri, other.transitUri)) {
            return false;
        }

        if (different(relationship, other.relationship)) {
            return false;
        }

        return !(eventType == ProvenanceEventType.REPLAY && eventTime != other.getEventTime());
    }

    private boolean different(final Object a, final Object b) {
        if (a == null && b == null) {
            return false;
        }
        if (a == null || b == null) {
            return true;
        }

        return !a.equals(b);
    }

    private boolean different(final List<String> a, final List<String> b) {
        if (a == null && b == null) {
            return false;
        }

        if (a == null && b != null) {
            return true;
        }

        if (a != null && b == null) {
            return true;
        }

        if (a.size() != b.size()) {
            return true;
        }

        final List<String> sortedA = new ArrayList<>(a);
        final List<String> sortedB = new ArrayList<>(b);

        Collections.sort(sortedA);
        Collections.sort(sortedB);

        for (int i = 0; i < sortedA.size(); i++) {
            if (!sortedA.get(i).equals(sortedB.get(i))) {
                return true;
            }
        }

        return false;
    }

    @Override
    public String toString() {
        return "ProvenanceEventRecord ["
                + "eventId=" + eventId
                + ", eventType=" + eventType
                + ", eventTime=" + new Date(eventTime)
                + ", uuid=" + uuid
                + ", fileSize=" + contentSize
                + ", componentId=" + componentId
                + ", transitUri=" + transitUri
                + ", sourceSystemFlowFileIdentifier=" + sourceSystemFlowFileIdentifier
                + ", parentUuids=" + parentUuids
                + ", alternateIdentifierUri=" + alternateIdentifierUri + "]";
    }

    public static class Builder implements ProvenanceEventBuilder {

        private long eventTime = System.currentTimeMillis();
        private long entryDate;
        private long lineageStartDate;
        private Set<String> lineageIdentifiers = new HashSet<>();
        private ProvenanceEventType eventType = null;
        private String componentId = null;
        private String componentType = null;
        private String sourceSystemFlowFileIdentifier = null;
        private String transitUri = null;
        private String uuid = null;
        private List<String> parentUuids = null;
        private List<String> childrenUuids = null;
        private String alternateIdentifierUri = null;
        private String details = null;
        private String relationship = null;
        private long storageByteOffset = -1L;
        private long eventDuration = -1L;
        private String storageFilename;

        private String contentClaimSection;
        private String contentClaimContainer;
        private String contentClaimIdentifier;
        private Long contentClaimOffset;
        private Long contentSize;

        private String previousClaimSection;
        private String previousClaimContainer;
        private String previousClaimIdentifier;
        private Long previousClaimOffset;
        private Long previousSize;

        private String sourceQueueIdentifier;

        private Map<String, String> previousAttributes;
        private Map<String, String> updatedAttributes;

        @Override
        public Builder fromEvent(final ProvenanceEventRecord event) {
            eventTime = event.getEventTime();
            entryDate = event.getFlowFileEntryDate();
            lineageStartDate = event.getLineageStartDate();
            lineageIdentifiers = event.getLineageIdentifiers();
            eventType = event.getEventType();
            componentId = event.getComponentId();
            componentType = event.getComponentType();
            transitUri = event.getTransitUri();
            sourceSystemFlowFileIdentifier = event.getSourceSystemFlowFileIdentifier();
            uuid = event.getFlowFileUuid();
            parentUuids = event.getParentUuids();
            childrenUuids = event.getChildUuids();
            alternateIdentifierUri = event.getAlternateIdentifierUri();
            eventDuration = event.getEventDuration();
            previousAttributes = event.getPreviousAttributes();
            updatedAttributes = event.getUpdatedAttributes();
            details = event.getDetails();
            relationship = event.getRelationship();

            contentClaimSection = event.getContentClaimSection();
            contentClaimContainer = event.getContentClaimContainer();
            contentClaimIdentifier = event.getContentClaimIdentifier();
            contentClaimOffset = event.getContentClaimOffset();
            contentSize = event.getFileSize();

            previousClaimSection = event.getPreviousContentClaimSection();
            previousClaimContainer = event.getPreviousContentClaimContainer();
            previousClaimIdentifier = event.getPreviousContentClaimIdentifier();
            previousClaimOffset = event.getPreviousContentClaimOffset();
            previousSize = event.getPreviousFileSize();

            sourceQueueIdentifier = event.getSourceQueueIdentifier();

            if (event instanceof StandardProvenanceEventRecord) {
                final StandardProvenanceEventRecord standardProvEvent = (StandardProvenanceEventRecord) event;
                storageByteOffset = standardProvEvent.storageByteOffset;
                storageFilename = standardProvEvent.storageFilename;
            }

            return this;
        }

        @Override
        public Builder setFlowFileEntryDate(final long entryDate) {
            this.entryDate = entryDate;
            return this;
        }

        @Override
        public Builder setLineageIdentifiers(final Set<String> lineageIdentifiers) {
            this.lineageIdentifiers = lineageIdentifiers;
            return this;
        }

        @Override
        public Builder setAttributes(final Map<String, String> previousAttributes, final Map<String, String> updatedAttributes) {
            this.previousAttributes = previousAttributes;
            this.updatedAttributes = updatedAttributes;
            return this;
        }

        @Override
        public Builder setFlowFileUUID(final String uuid) {
            this.uuid = uuid;
            return this;
        }

        public Builder setStorageLocation(final String filename, final long offset) {
            this.storageFilename = filename;
            this.storageByteOffset = offset;
            return this;
        }

        @Override
        public Builder setEventTime(long eventTime) {
            this.eventTime = eventTime;
            return this;
        }

        @Override
        public Builder setEventDuration(final long millis) {
            this.eventDuration = millis;
            return this;
        }

        @Override
        public Builder setLineageStartDate(final long startDate) {
            this.lineageStartDate = startDate;
            return this;
        }

        public Builder addLineageIdentifier(final String lineageIdentifier) {
            this.lineageIdentifiers.add(lineageIdentifier);
            return this;
        }

        @Override
        public Builder setEventType(ProvenanceEventType eventType) {
            this.eventType = eventType;
            return this;
        }

        @Override
        public Builder setComponentId(String componentId) {
            this.componentId = componentId;
            return this;
        }

        @Override
        public Builder setComponentType(String componentType) {
            this.componentType = componentType;
            return this;
        }

        @Override
        public Builder setSourceSystemFlowFileIdentifier(String sourceSystemFlowFileIdentifier) {
            this.sourceSystemFlowFileIdentifier = sourceSystemFlowFileIdentifier;
            return this;
        }

        @Override
        public Builder setTransitUri(String transitUri) {
            this.transitUri = transitUri;
            return this;
        }

        @Override
        public Builder addParentFlowFile(final FlowFile parentFlowFile) {
            if (this.parentUuids == null) {
                this.parentUuids = new ArrayList<>();
            }
            this.parentUuids.add(parentFlowFile.getAttribute(CoreAttributes.UUID.key()));
            return this;
        }

        public Builder addParentUuid(final String uuid) {
            if (this.parentUuids == null) {
                this.parentUuids = new ArrayList<>();
            }
            this.parentUuids.add(uuid);
            return this;
        }

        @Override
        public Builder removeParentFlowFile(final FlowFile parentFlowFile) {
            if (this.parentUuids == null) {
                return this;
            }

            parentUuids.remove(parentFlowFile.getAttribute(CoreAttributes.UUID.key()));
            return this;
        }

        @Override
        public Builder addChildFlowFile(final FlowFile childFlowFile) {
            if (this.childrenUuids == null) {
                this.childrenUuids = new ArrayList<>();
            }
            this.childrenUuids.add(childFlowFile.getAttribute(CoreAttributes.UUID.key()));
            return this;
        }

        public Builder addChildUuid(final String uuid) {
            if (this.childrenUuids == null) {
                this.childrenUuids = new ArrayList<>();
            }
            this.childrenUuids.add(uuid);
            return this;
        }

        @Override
        public Builder removeChildFlowFile(final FlowFile childFlowFile) {
            if (this.childrenUuids == null) {
                return this;
            }

            childrenUuids.remove(childFlowFile.getAttribute(CoreAttributes.UUID.key()));
            return this;
        }

        @Override
        public Builder setAlternateIdentifierUri(String alternateIdentifierUri) {
            this.alternateIdentifierUri = alternateIdentifierUri;
            return this;
        }

        @Override
        public Builder setDetails(String details) {
            this.details = details;
            return this;
        }

        @Override
        public Builder setRelationship(Relationship relationship) {
            this.relationship = relationship.getName();
            return this;
        }

        public Builder setRelationship(final String relationship) {
            this.relationship = relationship;
            return this;
        }

        @Override
        public ProvenanceEventBuilder fromFlowFile(final FlowFile flowFile) {
            setFlowFileEntryDate(flowFile.getEntryDate());
            setLineageIdentifiers(flowFile.getLineageIdentifiers());
            setLineageStartDate(flowFile.getLineageStartDate());
            setAttributes(Collections.<String, String>emptyMap(), flowFile.getAttributes());
            uuid = flowFile.getAttribute(CoreAttributes.UUID.key());
            this.contentSize = flowFile.getSize();
            return this;
        }

        @Override
        public Builder setPreviousContentClaim(final String container, final String section, final String identifier, final Long offset, final long size) {
            previousClaimSection = section;
            previousClaimContainer = container;
            previousClaimIdentifier = identifier;
            previousClaimOffset = offset;
            previousSize = size;
            return this;
        }

        @Override
        public Builder setCurrentContentClaim(final String container, final String section, final String identifier, final Long offset, final long size) {
            contentClaimSection = section;
            contentClaimContainer = container;
            contentClaimIdentifier = identifier;
            contentClaimOffset = offset;
            contentSize = size;
            return this;
        }

        @Override
        public Builder setSourceQueueIdentifier(final String identifier) {
            sourceQueueIdentifier = identifier;
            return this;
        }

        private void assertSet(final Object value, final String name) {
            if (value == null) {
                throw new IllegalStateException("Cannot create Provenance Event Record because " + name + " is not set");
            }
        }

        public ProvenanceEventType getEventType() {
            return eventType;
        }

        public List<String> getChildUuids() {
            return Collections.unmodifiableList(childrenUuids);
        }

        public List<String> getParentUuids() {
            return Collections.unmodifiableList(parentUuids);
        }

        @Override
        public StandardProvenanceEventRecord build() {
            assertSet(eventType, "Event Type");
            assertSet(componentId, "Component ID");
            assertSet(componentType, "Component Type");
            assertSet(uuid, "FlowFile UUID");
            assertSet(contentSize, "FlowFile Size");

            switch (eventType) {
                case ADDINFO:
                    if (alternateIdentifierUri == null) {
                        throw new IllegalStateException("Cannot create Provenance Event Record of type " + eventType + " because no alternate identifiers have been set");
                    }
                    break;
                case RECEIVE:
                case SEND:
                    assertSet(transitUri, "Transit URI");
                    break;
                case ROUTE:
                    assertSet(relationship, "Relationship");
                    break;
                case CLONE:
                case FORK:
                case JOIN:
                    if ((parentUuids == null || parentUuids.isEmpty()) && (childrenUuids == null || childrenUuids.isEmpty())) {
                        throw new IllegalStateException("Cannot create Provenance Event Record of type " + eventType + " because no Parent UUIDs or Children UUIDs have been set");
                    }
                    break;
                default:
                    break;
            }

            return new StandardProvenanceEventRecord(this);
        }
    }
}
