/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration;

import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.MapConfiguration;
import org.apache.tinkerpop.gremlin.process.traversal.P;
import org.apache.tinkerpop.gremlin.process.traversal.Parameterizing;
import org.apache.tinkerpop.gremlin.process.traversal.Step;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
import org.apache.tinkerpop.gremlin.process.traversal.step.Mutating;
import org.apache.tinkerpop.gremlin.process.traversal.step.filter.HasStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.AddEdgeStartStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.AddEdgeStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.AddVertexStartStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.AddVertexStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.EdgeOtherVertexStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.EdgeVertexStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.LambdaMapStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.PropertiesStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.PropertyMapStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.AddPropertyStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.Parameters;
import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
import org.apache.tinkerpop.gremlin.structure.Element;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Property;
import org.apache.tinkerpop.gremlin.structure.PropertyType;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.VertexProperty;
import org.apache.tinkerpop.gremlin.structure.util.ElementHelper;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;

/**
 * {@code PartitionStrategy} partitions the vertices, edges and vertex properties of a graph into String named
 * partitions (i.e. buckets, subgraphs, etc.).  It blinds a {@link Traversal} from "seeing" specified areas of
 * the graph given the partition names assigned to {@link Builder#readPartitions(String...)}.  The traversal will
 * ignore all graph elements not in those "read" partitions.
 *
 * @author Stephen Mallette (http://stephen.genoprime.com)
 * @author Marko A. Rodriguez (http://markorodriguez.com)
 */
public final class PartitionStrategy extends AbstractTraversalStrategy<TraversalStrategy.DecorationStrategy> implements TraversalStrategy.DecorationStrategy {
    private String writePartition;
    private final String partitionKey;
    private final Set<String> readPartitions;
    private final boolean includeMetaProperties;

    private PartitionStrategy(final Builder builder) {
        this.writePartition = builder.writePartition;
        this.partitionKey = builder.partitionKey;
        this.readPartitions = Collections.unmodifiableSet(builder.readPartitions);
        this.includeMetaProperties = builder.includeMetaProperties;
    }

    public String getWritePartition() {
        return this.writePartition;
    }

    public String getPartitionKey() {
        return this.partitionKey;
    }

    public Set<String> getReadPartitions() {
        return readPartitions;
    }

    public boolean isIncludeMetaProperties() {
        return includeMetaProperties;
    }

    public static Builder build() {
        return new Builder();
    }

    @Override
    public void apply(final Traversal.Admin<?, ?> traversal) {
        final Graph graph = traversal.getGraph().orElseThrow(() -> new IllegalStateException("PartitionStrategy does not work with anonymous Traversals"));
        final Graph.Features.VertexFeatures vertexFeatures = graph.features().vertex();
        final boolean supportsMetaProperties = vertexFeatures.supportsMetaProperties();
        if (includeMetaProperties && !supportsMetaProperties)
            throw new IllegalStateException("PartitionStrategy is configured to include meta-properties but the Graph does not support them");

        // no need to add has after mutating steps because we want to make it so that the write partition can
        // be independent of the read partition.  in other words, i don't need to be able to read from a partition
        // in order to write to it.
        final List<Step> stepsToInsertHasAfter = new ArrayList<>();
        stepsToInsertHasAfter.addAll(TraversalHelper.getStepsOfAssignableClass(GraphStep.class, traversal));
        stepsToInsertHasAfter.addAll(TraversalHelper.getStepsOfAssignableClass(VertexStep.class, traversal));
        stepsToInsertHasAfter.addAll(TraversalHelper.getStepsOfAssignableClass(EdgeOtherVertexStep.class, traversal));
        stepsToInsertHasAfter.addAll(TraversalHelper.getStepsOfAssignableClass(EdgeVertexStep.class, traversal));

        // all steps that return a vertex need to have has(partitionKey,within,partitionValues) injected after it
        stepsToInsertHasAfter.forEach(step -> TraversalHelper.insertAfterStep(
                new HasStep(traversal, new HasContainer(partitionKey, P.within(new ArrayList<>(readPartitions)))), step, traversal));

        if (includeMetaProperties) {
            final List<PropertiesStep> propertiesSteps = TraversalHelper.getStepsOfAssignableClass(PropertiesStep.class, traversal);
            propertiesSteps.forEach(step -> {
                // check length first because keyExists will return true otherwise
                if (step.getPropertyKeys().length > 0 && ElementHelper.keyExists(partitionKey, step.getPropertyKeys()))
                    throw new IllegalStateException("Cannot explicitly request the partitionKey in the traversal");

                if (step.getReturnType() == PropertyType.PROPERTY) {
                    // check the following step to see if it is a has(partitionKey, *) - if so then this strategy was
                    // already applied down below via g.V().values() which injects a properties() step
                    final Step next = step.getNextStep();
                    if (!(next instanceof HasStep) || !((HasContainer) ((HasStep) next).getHasContainers().get(0)).getKey().equals(partitionKey)) {
                        // use choose() to determine if the properties() step is called on a Vertex to get a VertexProperty
                        // if not, pass it through.
                        final Traversal choose = __.choose(
                                __.filter(new TypeChecker<>(VertexProperty.class)),
                                __.has(partitionKey, P.within(new ArrayList<>(readPartitions))),
                                __.__()).filter(new PartitionKeyHider());
                        TraversalHelper.insertTraversal(step, choose.asAdmin(), traversal);
                    }
                } else if (step.getReturnType() == PropertyType.VALUE) {
                    // use choose() to determine if the values() step is called on a Vertex to get a VertexProperty
                    // if not, pass it through otherwise explode g.V().values() to g.V().properties().has().value()
                    final Traversal choose = __.choose(
                            __.filter(new TypeChecker<>(Vertex.class)),
                            __.properties(step.getPropertyKeys()).has(partitionKey, P.within(new ArrayList<>(readPartitions))).filter(new PartitionKeyHider()).value(),
                            __.__().filter(new PartitionKeyHider()));
                    TraversalHelper.insertTraversal(step, choose.asAdmin(), traversal);
                    traversal.removeStep(step);
                } else {
                    throw new IllegalStateException(String.format("%s is not accounting for a particular %s %s",
                            PartitionStrategy.class.getSimpleName(), PropertyType.class.toString(), step.getReturnType()));
                }
            });

            final List<PropertyMapStep> propertyMapSteps = TraversalHelper.getStepsOfAssignableClass(PropertyMapStep.class, traversal);
            propertyMapSteps.forEach(step -> {
                // check length first because keyExists will return true otherwise
                if (step.getPropertyKeys().length > 0 && ElementHelper.keyExists(partitionKey, step.getPropertyKeys()))
                    throw new IllegalStateException("Cannot explicitly request the partitionKey in the traversal");

                if (step.getReturnType() == PropertyType.PROPERTY) {
                    // via map() filter out properties that aren't in the partition if it is a PropertyVertex,
                    // otherwise just let them pass through
                    TraversalHelper.insertAfterStep(new LambdaMapStep<>(traversal, new MapPropertiesFilter()), step, traversal);
                } else if (step.getReturnType() == PropertyType.VALUE) {
                    // as this is a value map, replace that step with propertiesMap() that returns PropertyType.VALUE.
                    // from there, add the filter as shown above and then unwrap the properties as they would have
                    // been done under valueMap()
                    final PropertyMapStep propertyMapStep = new PropertyMapStep(traversal, step.isIncludeTokens(), PropertyType.PROPERTY, step.getPropertyKeys());
                    TraversalHelper.replaceStep(step, propertyMapStep, traversal);

                    final LambdaMapStep mapPropertiesFilterStep = new LambdaMapStep<>(traversal, new MapPropertiesFilter());
                    TraversalHelper.insertAfterStep(mapPropertiesFilterStep, propertyMapStep, traversal);
                    TraversalHelper.insertAfterStep(new LambdaMapStep<>(traversal, new MapPropertiesConverter()), mapPropertiesFilterStep, traversal);
                } else {
                    throw new IllegalStateException(String.format("%s is not accounting for a particular %s %s",
                            PartitionStrategy.class.getSimpleName(), PropertyType.class.toString(), step.getReturnType()));
                }
            });
        }

        final List<Step> stepsToInsertPropertyMutations = traversal.getSteps().stream().filter(step ->
                step instanceof AddEdgeStep || step instanceof AddVertexStep ||
                step instanceof AddEdgeStartStep || step instanceof AddVertexStartStep ||
                (includeMetaProperties && step instanceof AddPropertyStep)
        ).collect(Collectors.toList());

        stepsToInsertPropertyMutations.forEach(step -> {
            // note that with AddPropertyStep we just add the partition key/value regardless of whether this
            // ends up being a Vertex or not.  AddPropertyStep currently chooses to simply not bother
            // to use the additional "property mutations" if the Element being mutated is a Edge or
            // VertexProperty
            ((Mutating) step).addPropertyMutations(partitionKey, writePartition);

            if (includeMetaProperties) {
                // GraphTraversal folds g.addV().property('k','v') to just AddVertexStep/AddVertexStartStep so this
                // has to be exploded back to g.addV().property(cardinality, 'k','v','partition','A')
                if (step instanceof AddVertexStartStep || step instanceof AddVertexStep) {
                    final Parameters parameters = ((Parameterizing) step).getParameters();
                    final Map<Object, List<Object>> params = parameters.getRaw();
                    params.forEach((k, v) -> {

                        // need to filter out T based keys
                        if (k instanceof String) {
                            final List<Step> addPropertyStepsToAppend = new ArrayList<>(v.size());
                            final VertexProperty.Cardinality cardinality = vertexFeatures.getCardinality((String) k);
                            v.forEach(o -> {
                                final AddPropertyStep addPropertyStep = new AddPropertyStep(traversal, cardinality, k, o);
                                addPropertyStep.addPropertyMutations(partitionKey, writePartition);
                                addPropertyStepsToAppend.add(addPropertyStep);

                                // need to remove the parameter from the AddVertex/StartStep because it's now being added
                                // via the AddPropertyStep
                                parameters.remove(k);
                            });

                            Collections.reverse(addPropertyStepsToAppend);
                            addPropertyStepsToAppend.forEach(s -> TraversalHelper.insertAfterStep(s, step, traversal));
                        }
                    });
                }
            }
        });
    }

    /**
     * A concrete lambda implementation that checks if the type passing through on the {@link Traverser} is
     * of a specific {@link Element} type.
     */
    public final class TypeChecker<A> implements Predicate<Traverser<A>>, Serializable {
        final Class<? extends Element> toCheck;

        public TypeChecker(final Class<? extends Element> toCheck) {
            this.toCheck = toCheck;
        }

        @Override
        public boolean test(final Traverser traverser) {
            return toCheck.isAssignableFrom(traverser.get().getClass());
        }

        @Override
        public String toString() {
            return "instanceOf(" + toCheck.getSimpleName() + ")";
        }
    }

    /**
     * A concrete lambda implementation that filters out the partition key so that it isn't visible when making
     * calls to {@link GraphTraversal#valueMap}.
     */
    public final class PartitionKeyHider<A extends Property> implements Predicate<Traverser<A>>, Serializable {
        @Override
        public boolean test(final Traverser<A> traverser) {
            return !traverser.get().key().equals(partitionKey);
        }

        @Override
        public String toString() {
            return "remove(" + partitionKey + ")";
        }
    }

    /**
     * Takes the result of a {@link Map} containing {@link Property} lists and if the property is a
     * {@link VertexProperty} it applies a filter based on the current partitioning.  If is not a
     * {@link VertexProperty} the property is simply passed through.
     */
    public final class MapPropertiesFilter implements Function<Traverser<Map<String, List<Property>>>, Map<String, List<Property>>>, Serializable {
        @Override
        public Map<String, List<Property>> apply(final Traverser<Map<String, List<Property>>> mapTraverser) {
            final Map<String, List<Property>> values = mapTraverser.get();
            final Map<String, List<Property>> filtered = new HashMap<>();

            // note the final filter that removes the partitionKey from the outgoing Map
            values.entrySet().forEach(p -> {
                final List l = p.getValue().stream().filter(property -> {
                    if (property instanceof VertexProperty) {
                        final Iterator<String> itty = ((VertexProperty) property).values(partitionKey);
                        return itty.hasNext() && readPartitions.contains(itty.next());
                    } else {
                        return true;
                    }
                }).filter(property -> !property.key().equals(partitionKey)).collect(Collectors.toList());
                if (l.size() > 0) filtered.put(p.getKey(), l);
            });

            return filtered;
        }

        @Override
        public String toString() {
            return "applyPartitionFilter";
        }
    }

    /**
     * Takes a {@link Map} of a {@link List} of {@link Property} objects and unwraps the {@link Property#value()}.
     */
    public final class MapPropertiesConverter implements Function<Traverser<Map<String, List<Property>>>, Map<String, List<Property>>>, Serializable {
        @Override
        public Map<String, List<Property>> apply(final Traverser<Map<String, List<Property>>> mapTraverser) {
            final Map<String, List<Property>> values = mapTraverser.get();
            final Map<String, List<Property>> converted = new HashMap<>();

            values.entrySet().forEach(p -> {
                final List l = p.getValue().stream().map(property -> property.value()).collect(Collectors.toList());
                converted.put(p.getKey(), l);
            });

            return converted;
        }

        @Override
        public String toString() {
            return "extractValuesInPropertiesMap";
        }
    }

    @Override
    public Configuration getConfiguration() {
        final Map<String, Object> map = new HashMap<>();
        map.put(STRATEGY, PartitionStrategy.class.getCanonicalName());
        map.put(INCLUDE_META_PROPERTIES, this.includeMetaProperties);
        if (null != this.writePartition)
            map.put(WRITE_PARTITION, this.writePartition);
        if (null != this.readPartitions)
            map.put(READ_PARTITIONS, this.readPartitions);
        if (null != this.partitionKey)
            map.put(PARTITION_KEY, this.partitionKey);
        return new MapConfiguration(map);
    }

    public static final String INCLUDE_META_PROPERTIES = "includeMetaProperties";
    public static final String WRITE_PARTITION = "writePartition";
    public static final String PARTITION_KEY = "partitionKey";
    public static final String READ_PARTITIONS = "readPartitions";

    public static PartitionStrategy create(final Configuration configuration) {
        final PartitionStrategy.Builder builder = PartitionStrategy.build();
        if (configuration.containsKey(INCLUDE_META_PROPERTIES))
            builder.includeMetaProperties(configuration.getBoolean(INCLUDE_META_PROPERTIES));
        if (configuration.containsKey(WRITE_PARTITION))
            builder.writePartition(configuration.getString(WRITE_PARTITION));
        if (configuration.containsKey(PARTITION_KEY))
            builder.partitionKey(configuration.getString(PARTITION_KEY));
        if (configuration.containsKey(READ_PARTITIONS))
            builder.readPartitions(new ArrayList((Collection)configuration.getProperty(READ_PARTITIONS)));
        return builder.create();
    }

    public final static class Builder {
        private String writePartition;
        private String partitionKey;
        private Set<String> readPartitions = new HashSet<>();
        private boolean includeMetaProperties = false;

        Builder() {
        }

        /**
         * Set to {@code true} if the {@link VertexProperty} instances should get assigned to partitions.  This
         * has the effect of hiding properties within a particular partition so that in order for the
         * {@link VertexProperty} to be seen both the parent {@link Vertex} and the {@link VertexProperty} must have
         * readable partitions defined in the strategy.
         * <p/>
         * When setting this to {@code true} (it is {@code false} by default) it is important that the {@link Graph}
         * support the meta-properties feature.  If it does not errors will ensue.
         */
        public Builder includeMetaProperties(final boolean includeMetaProperties) {
            this.includeMetaProperties = includeMetaProperties;
            return this;
        }

        /**
         * Specifies the name of the partition to write when adding vertices, edges and vertex properties.  This
         * name can be any user defined value.  It is only possible to write to a single partition at a time.
         */
        public Builder writePartition(final String writePartition) {
            this.writePartition = writePartition;
            return this;
        }

        /**
         * Specifies the partition key name.  This is the property key that contains the partition value. It
         * may a good choice to index on this key in certain cases (in graphs that support such things). This
         * value must be specified for the {@code PartitionStrategy} to be constructed properly.
         */
        public Builder partitionKey(final String partitionKey) {
            this.partitionKey = partitionKey;
            return this;
        }

        /**
         * Specifies the partition of the graph to read from.  It is possible to assign multiple partition keys so
         * as to read from multiple partitions at the same time.
         */
        public Builder readPartitions(final List<String> readPartitions) {
            this.readPartitions.addAll(readPartitions);
            return this;
        }

        /**
         * Specifies the partition of the graph to read from.  It is possible to assign multiple partition keys so
         * as to read from multiple partitions at the same time.
         */
        public Builder readPartitions(final String... readPartitions) {
            return this.readPartitions(Arrays.asList(readPartitions));
        }

        /**
         * Specifies the partition of the graph to read from.  It is possible to assign multiple partition keys so
         * as to read from multiple partitions at the same time.
         *
         * @deprecated As of release 3.2.3, replaced by {@link Builder#readPartitions(List)}.
         */
        @Deprecated
        public Builder addReadPartition(final String readPartition) {
            this.readPartitions.add(readPartition);
            return this;
        }

        /**
         * Creates the {@code PartitionStrategy}.
         */
        public PartitionStrategy create() {
            if (partitionKey == null || partitionKey.isEmpty())
                throw new IllegalStateException("The partitionKey cannot be null or empty");

            return new PartitionStrategy(this);
        }
    }
}
