/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.atlas.typesystem.types;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.atlas.AtlasException;
import org.apache.atlas.typesystem.IReferenceableInstance;
import org.apache.atlas.typesystem.IStruct;
import org.apache.atlas.typesystem.persistence.Id;

import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;

/**
 * Given a IReferenceableInstance, a Walker will traverse the Object Graph
 * reachable form the instance. It will invoke the process call on the provided NodeProcessor
 * for each non-primitive attribute (Structs, Traits, References, Arrays of Non-Primitives, Maps
 * of Non-Primitives)
 */
public class ObjectGraphWalker {

    final Queue<IReferenceableInstance> queue;
    final TypeSystem typeSystem;
    final NodeProcessor nodeProcessor;
    Set<Id> processedIds;

    public ObjectGraphWalker(TypeSystem typeSystem, NodeProcessor nodeProcessor) throws AtlasException {
        this(typeSystem, nodeProcessor, (IReferenceableInstance) null);
    }

    public ObjectGraphWalker(TypeSystem typeSystem, NodeProcessor nodeProcessor, IReferenceableInstance start)
    throws AtlasException {
        this.typeSystem = typeSystem;
        this.nodeProcessor = nodeProcessor;
        queue = new LinkedList<>();
        processedIds = new HashSet<>();
        if (start != null) {
            visitReferenceableInstance(start);
        }
    }

    public ObjectGraphWalker(TypeSystem typeSystem, NodeProcessor nodeProcessor,
            List<? extends IReferenceableInstance> roots) throws AtlasException {
        this.typeSystem = typeSystem;
        this.nodeProcessor = nodeProcessor;
        queue = new LinkedList<IReferenceableInstance>();
        processedIds = new HashSet<Id>();
        for (IReferenceableInstance r : roots) {
            visitReferenceableInstance(r);
        }
    }

    public void walk() throws AtlasException {
        while (!queue.isEmpty()) {
            IReferenceableInstance r = queue.poll();
            if(r != null) {
                processReferenceableInstance(r);
            }
        }
    }

    public void addRoot(IReferenceableInstance root) {
        visitReferenceableInstance(root);
    }

    void traverseValue(IDataType dT, Object val) throws AtlasException {
        if (val != null) {
            if (dT.getTypeCategory() == DataTypes.TypeCategory.ARRAY) {
                IDataType elemType = ((DataTypes.ArrayType) dT).getElemType();
                visitCollection(elemType, val);
            } else if (dT.getTypeCategory() == DataTypes.TypeCategory.MAP) {
                IDataType keyType = ((DataTypes.MapType) dT).getKeyType();
                IDataType valueType = ((DataTypes.MapType) dT).getValueType();
                visitMap(keyType, valueType, val);
            } else if (dT.getTypeCategory() == DataTypes.TypeCategory.STRUCT
                    || dT.getTypeCategory() == DataTypes.TypeCategory.TRAIT) {
                visitStruct(val);
            } else if (dT.getTypeCategory() == DataTypes.TypeCategory.CLASS) {
                visitReferenceableInstance(val);
            }
        }
    }

    void visitMap(IDataType keyType, IDataType valueType, Object val) throws AtlasException {
        if (keyType.getTypeCategory() == DataTypes.TypeCategory.PRIMITIVE
                && valueType.getTypeCategory() == DataTypes.TypeCategory.PRIMITIVE) {
            return;
        }

        if (val != null) {
            Iterator<Map.Entry> it = null;
            if (Map.class.isAssignableFrom(val.getClass())) {
                it = ((Map) val).entrySet().iterator();
                ImmutableMap.Builder b = ImmutableMap.builder();
                while (it.hasNext()) {
                    Map.Entry e = it.next();
                    traverseValue(keyType, e.getKey());
                    traverseValue(valueType, e.getValue());
                }
            }
        }
    }

    void visitCollection(IDataType elemType, Object val) throws AtlasException {

        if (elemType.getTypeCategory() == DataTypes.TypeCategory.PRIMITIVE) {
            return;
        }

        if (val != null) {
            Iterator it = null;
            if (val instanceof Collection) {
                it = ((Collection) val).iterator();
            } else if (val instanceof Iterable) {
                it = ((Iterable) val).iterator();
            } else if (val instanceof Iterator) {
                it = (Iterator) val;
            }
            if (it != null) {
                DataTypes.TypeCategory elemCategory = elemType.getTypeCategory();
                while (it.hasNext()) {
                    Object elem = it.next();
                    traverseValue(elemType, elem);
                }
            }
        }
    }

    void visitStruct(Object val) throws AtlasException {

        if (val == null || !(val instanceof IStruct)) {
            return;
        }

        IStruct i = (IStruct) val;

        IConstructableType type = typeSystem.getDataType(IConstructableType.class, i.getTypeName());

        for (Map.Entry<String, AttributeInfo> e : type.fieldMapping().fields.entrySet()) {
            AttributeInfo aInfo = e.getValue();
            String attrName = e.getKey();
            if (aInfo.dataType().getTypeCategory() != DataTypes.TypeCategory.PRIMITIVE) {
                Object aVal = i.get(attrName);
                nodeProcessor.processNode(new Node(i, attrName, aInfo, aVal));
                traverseValue(aInfo.dataType(), aVal);
            }
        }
    }

    void visitReferenceableInstance(Object val) {

        if (val == null || !(val instanceof IReferenceableInstance)) {
            return;
        }

        IReferenceableInstance ref = (IReferenceableInstance) val;

        if (!processedIds.contains(ref.getId())) {
            processedIds.add(ref.getId());
            if (!(ref instanceof Id)) {
                queue.add(ref);
            }
        }
    }

    void processReferenceableInstance(IReferenceableInstance ref) throws AtlasException {

        nodeProcessor.processNode(new Node(ref, null, null, null));
        visitStruct(ref);
        ImmutableList<String> traits = ref.getTraits();
        for (String trait : traits) {
            visitStruct(ref.getTrait(trait));
        }
    }

    public interface NodeProcessor {

        void processNode(Node nd) throws AtlasException;
    }

    /**
     * Represents a non-primitive value of an instance.
     */
    public static class Node {
        public final IStruct instance;
        public final String attributeName;
        public final AttributeInfo aInfo;
        public final Object value;

        public Node(IStruct instance, String attributeName, AttributeInfo aInfo, Object value) {
            this.instance = instance;
            this.attributeName = attributeName;
            this.aInfo = aInfo;
            this.value = value;
        }

        @Override
        public String toString(){
            StringBuilder string = new StringBuilder().append(instance).append(aInfo).append(value);
            return string.toString();
        }
    }
}
