/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.sling.pipes.internal;

import org.apache.commons.lang3.StringUtils;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceUtil;
import org.apache.sling.event.jobs.Job;
import org.apache.sling.pipes.BasePipe;
import org.apache.sling.pipes.ExecutionResult;
import org.apache.sling.pipes.OutputWriter;
import org.apache.sling.pipes.Pipe;
import org.apache.sling.pipes.PipeBuilder;
import org.apache.sling.pipes.Plumber;
import org.apache.sling.pipes.internal.inputstream.CsvPipe;
import org.apache.sling.pipes.internal.inputstream.JsonPipe;
import org.apache.sling.pipes.internal.inputstream.RegexpPipe;
import org.apache.sling.pipes.internal.slingquery.ChildrenPipe;
import org.apache.sling.pipes.internal.slingquery.ClosestPipe;
import org.apache.sling.pipes.internal.slingquery.FindPipe;
import org.apache.sling.pipes.internal.slingquery.ParentPipe;
import org.apache.sling.pipes.internal.slingquery.ParentsPipe;
import org.apache.sling.pipes.internal.slingquery.SiblingsPipe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import static org.apache.sling.jcr.resource.JcrResourceConstants.NT_SLING_FOLDER;
import static org.apache.sling.jcr.resource.JcrResourceConstants.NT_SLING_ORDERED_FOLDER;
import static org.apache.sling.jcr.resource.JcrResourceConstants.SLING_RESOURCE_TYPE_PROPERTY;
import static org.apache.sling.pipes.internal.ManifoldPipe.PN_NUM_THREADS;

import static org.apache.sling.pipes.internal.CommandUtil.checkArguments;
import static org.apache.sling.pipes.internal.CommandUtil.writeToMap;
/**
 * Implementation of the PipeBuilder interface
 */
public class PipeBuilderImpl implements PipeBuilder {
    private static final Logger logger = LoggerFactory.getLogger(PipeBuilderImpl.class);

    public static final String PIPES_REPOSITORY_PATH = "/var/pipes";

    public static final String[] DEFAULT_NAMES = new String[]{"one", "two", "three", "four", "five", "six", "seven", "eight", "nine", "ten"};

    List<Step> steps;

    Map outputs;

    Step containerStep = new Step(ContainerPipe.RESOURCE_TYPE);

    Step currentStep = containerStep;

    Plumber plumber;

    ResourceResolver resolver;

    /**
     * protected constructor (to only allow internal classes to build it out)
     * @param resolver resolver with which the pipe will be built and executed
     * @param plumber instance of the plumber
     */
    protected PipeBuilderImpl(ResourceResolver resolver, Plumber plumber){
        this.plumber = plumber;
        this.resolver = resolver;
    }

    @Override
    public PipeBuilder pipe(String type){
        if (!plumber.isTypeRegistered(type)){
            throw new IllegalArgumentException(type + " is not a registered pipe type");
        }
        if (steps == null){
            steps = new ArrayList<>();
        }
        currentStep = new Step(type);
        steps.add(currentStep);
        return this;
    }

    /**
     * internal utility to glob pipe configuration &amp; expression configuration
     * @param type pipe type
     * @param expr expression
     * @return updated instance of PipeBuilder
     */
    protected PipeBuilder pipeWithExpr(String type, String expr){
        try {
            pipe(type).expr(expr);
        } catch (IllegalAccessException e){
            logger.error("exception while configuring {}", type, e);
        }
        return this;
    }

    @Override
    public PipeBuilder mv(String expr) {
        return pipeWithExpr(MovePipe.RESOURCE_TYPE, expr);
    }

    @Override
    public PipeBuilder write(Object... conf) throws IllegalAccessException {
        return pipe(WritePipe.RESOURCE_TYPE).conf(conf);
    }

    @Override
    public PipeBuilder grep(Object... conf) throws IllegalAccessException {
        return pipe(FilterPipe.RESOURCE_TYPE).conf(conf);
    }

    @Override
    public PipeBuilder auth(Object... conf) throws IllegalAccessException {
        return pipe(AuthorizablePipe.RESOURCE_TYPE).conf(conf);
    }

    @Override
    public PipeBuilder xpath(String expr) {
        return pipeWithExpr(XPathPipe.RESOURCE_TYPE, expr);
    }

    @Override
    public PipeBuilder children(String expr) {
        return pipeWithExpr(ChildrenPipe.RESOURCE_TYPE, expr);
    }

    @Override
    public PipeBuilder rm() {
        return pipe(RemovePipe.RESOURCE_TYPE);
    }

    @Override
    public PipeBuilder traverse() {
        return pipe(TraversePipe.RESOURCE_TYPE);
    }


    @Override
    public PipeBuilder csv(String expr) {
        return pipeWithExpr(CsvPipe.RESOURCE_TYPE, expr);
    }

    @Override
    public PipeBuilder json(String expr) {
        return pipeWithExpr(JsonPipe.RESOURCE_TYPE, expr);
    }

    @Override
    public PipeBuilder egrep(String expr) {
        return pipeWithExpr(RegexpPipe.RESOURCE_TYPE, expr);
    }

    @Override
    public PipeBuilder mkdir(String expr) {
        return pipeWithExpr(PathPipe.RESOURCE_TYPE, expr);
    }

    @Override
    public PipeBuilder echo(String path) {
        try {
            pipe(BasePipe.RESOURCE_TYPE).path(path);
        } catch(IllegalAccessException e){
            logger.error("error when calling echo {}", path, e);
        }
        return this;
    }

    @Override
    public PipeBuilder parent() {
        return pipe(ParentPipe.RESOURCE_TYPE);
    }

    @Override
    public PipeBuilder parents(String expr) {
        return pipeWithExpr(ParentsPipe.RESOURCE_TYPE, expr);
    }

    @Override
    public PipeBuilder siblings(String expr) {
        return pipeWithExpr(SiblingsPipe.RESOURCE_TYPE, expr);
    }

    @Override
    public PipeBuilder closest(String expr) {
        return pipeWithExpr(ClosestPipe.RESOURCE_TYPE, expr);
    }

    @Override
    public PipeBuilder $(String expr) {
        return pipeWithExpr(FindPipe.RESOURCE_TYPE, expr);
    }

    @Override
    public PipeBuilder ref(String expr) {
        return pipeWithExpr(ReferencePipe.RESOURCE_TYPE, expr);
    }

    @Override
    public PipeBuilder mp() {
        return pipe(MultiPropertyPipe.RESOURCE_TYPE);
    }

    @Override
    public PipeBuilder pkg(String expr) {
        try {
            pipeWithExpr(PackagePipe.RESOURCE_TYPE, expr).with(PackagePipe.PN_FILTERCOLLECTIONMODE, true);
        } catch (IllegalAccessException e) {
            logger.error("error when calling pkg", e);
        }
        return this;
    }

    @Override
    public PipeBuilder not(String expr) {
        return pipeWithExpr(NotPipe.RESOURCE_TYPE, expr);
    }

    @Override
    public PipeBuilder with(Object... params) throws IllegalAccessException {
        return writeToCurrentStep(null, params);
    }

    @Override
    public PipeBuilder conf(Object... properties) throws IllegalAccessException {
        return writeToCurrentStep(Pipe.NN_CONF, properties);
    }

    @Override
    public PipeBuilder acls() throws IllegalAccessException{
        return pipe(ACLPipe.RESOURCE_TYPE);
    }

    @Override
    public PipeBuilder allow(String expr) throws IllegalAccessException{
        return pipeWithExpr(ACLPipe.RESOURCE_TYPE, expr).with("allow","true");
    }

    @Override
    public PipeBuilder deny(String expr) throws IllegalAccessException{
        return pipeWithExpr(ACLPipe.RESOURCE_TYPE, expr).with("deny","true");
    }

    /**
     * Add some configurations to current's Step node defined by name (if null, will be step's properties)
     * @param name name of the configuration node, can be null in which case it's the subpipe itself
     * @param params key/value pair list of configuration
     * @return updated instance of PipeBuilder
     * @throws IllegalAccessException in case configuration is wrong
     */
    protected PipeBuilder writeToCurrentStep(String name, Object... params) throws IllegalAccessException {
        checkArguments(params);
        Map props = name != null ? currentStep.confs.get(name) : currentStep.properties;
        if (props == null){
            props = new HashMap();
            if (name != null){
                currentStep.confs.put(name, props);
            }
        }
        writeToMap(props, params);
        return this;
    }

    @Override
    public PipeBuilder expr(String value) throws IllegalAccessException {
        return this.with(Pipe.PN_EXPR, value);
    }

    @Override
    public PipeBuilder path(String value) throws IllegalAccessException {
        return this.with(Pipe.PN_PATH, value);
    }

    @Override
    public PipeBuilder name(String name) throws IllegalAccessException {
        currentStep.name = name;
        return this;
    }

    /**
     * build a time + random based path under /var/pipes
     * @return full path of future Pipe
     */
    protected String buildRandomPipePath() {
        final Calendar now = Calendar.getInstance();
        return PIPES_REPOSITORY_PATH + '/' + now.get(Calendar.YEAR) + '/' + now.get(Calendar.MONTH) + '/' + now.get(Calendar.DAY_OF_MONTH) + "/"
                + UUID.randomUUID().toString();
    }

    /**
     * Create a configuration resource
     * @param resolver current resolver
     * @param path path of the resource
     * @param type type of the node to be created
     * @param data map of properties to add
     * @throws PersistenceException in case configuration resource couldn't be persisted
     * @return resource created
     */
    protected Resource createResource(ResourceResolver resolver, String path, String type, Map data) throws PersistenceException {
        return ResourceUtil.getOrCreateResource(resolver, path, data, type, false);
    }

    @Override
    public PipeBuilder outputs(String... keys) {
        outputs = new HashMap();
        writeToMap(outputs, keys);
        return this;
    }

    @Override
    public Pipe build() throws PersistenceException {
        return build(buildRandomPipePath());
    }

    /**
     * Persist a step at a given path
     * @param path path into which step should be persisted
     * @param parentType type of the parent resource
     * @param step step to persist
     * @return created resource
     * @throws PersistenceException in case persistence fails
     */
    protected Resource persistStep(String path, String parentType, Step step) throws PersistenceException {
        Resource resource = createResource(resolver, path, parentType, step.properties);
        for (Map.Entry<String, Map> entry : step.confs.entrySet()){
            createResource(resolver, path + "/" + entry.getKey(), NT_SLING_FOLDER, entry.getValue());
            logger.debug("built pipe {}'s {} node", path, entry.getKey());
        }
        return resource;
    }

    @Override
    public Pipe build(String path) throws PersistenceException {
        Resource pipeResource = persistStep(path, NT_SLING_FOLDER, containerStep);
        if (outputs != null){
            ResourceUtil.getOrCreateResource(resolver, path + "/" + OutputWriter.PARAM_WRITER, outputs, NT_SLING_FOLDER, false);
        }
        int index = 0;
        for (Step step : steps){
            String name = StringUtils.isNotBlank(step.name) ? step.name : DEFAULT_NAMES.length > index ? DEFAULT_NAMES[index] : Integer.toString(index);
            index++;
            persistStep(path + "/" + Pipe.NN_CONF + "/" + name, NT_SLING_ORDERED_FOLDER, step);
        }
        resolver.commit();
        logger.debug("built pipe under {}", path);
        return plumber.getPipe(pipeResource);
    }

    @Override
    public ExecutionResult run() throws Exception {
        return run(null);
    }

    @Override
    public ExecutionResult runWith(Object... bindings) throws Exception {
        checkArguments(bindings);
        Map bindingsMap = new HashMap();
        writeToMap(bindingsMap, bindings);
        return run(bindingsMap);
    }

    @Override
    public ExecutionResult run(Map bindings) throws Exception {
        JsonWriter writer = new JsonWriter();
        writer.starts();
        Pipe pipe = this.build();
        return plumber.execute(resolver, pipe, bindings,  writer , true);
    }

    @Override
    public Job runAsync(Map bindings) throws PersistenceException {
        Pipe pipe = this.build();
        return plumber.executeAsync(resolver, pipe.getResource().getPath(), bindings);
    }

    @Override
    public ExecutionResult runParallel(int numThreads, Map additionalBindings) throws Exception {
        containerStep.setType(ManifoldPipe.RESOURCE_TYPE);
        Map bindings = new HashMap() {{put(PN_NUM_THREADS, numThreads);}};
        if (additionalBindings != null){
            bindings.putAll(additionalBindings);
        }
        return run(bindings);
    }

    /**
     * holds a subpipe set of informations
     */
    public class Step {
        String name;
        Map properties;
        Map<String, Map> confs = new HashMap<>();
        Step(String type){
            properties = new HashMap();
            setType(type);
        }

        void setType(String type){
            properties.put(SLING_RESOURCE_TYPE_PROPERTY, type);
        }
    }
}