// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.
package streamer;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;

public class PipelineImpl implements Pipeline {

    protected String id;
    protected boolean verbose = System.getProperty("streamer.Pipeline.debug", "false").equals("true");

    public PipelineImpl(String id) {
        this.id = id;
        elements = initElementMap(id);
    }

    protected Map<String, Element> elements;

    protected HashMap<String, Element> initElementMap(String id) {
        HashMap<String, Element> map = new HashMap<String, Element>();

        map.put(IN, new BaseElement(id + "." + IN));
        map.put(OUT, new BaseElement(id + "." + OUT));
        return map;
    }

    @Override
    public Link getLink(String padName) {
        Link link = elements.get(IN).getLink(padName);
        if (link == null)
            link = elements.get(OUT).getLink(padName);
        return link;
    }

    @Override
    public Set<String> getPads(Direction direction) {
        switch (direction) {
            case IN:
                return elements.get(IN).getPads(direction);

            case OUT:
                return elements.get(OUT).getPads(direction);
        }
        return null;
    }

    @Override
    public void validate() {
        for (Element element : elements.values())
            element.validate();

        // Check IN element
        {
            Element element = get(IN);
            int outPadsNumber = element.getPads(Direction.OUT).size();
            int inPadsNumber = element.getPads(Direction.IN).size();
            if ((outPadsNumber | inPadsNumber) > 0 && (outPadsNumber == 0 || inPadsNumber == 0))
                throw new RuntimeException("[ " + this + "] Pads of input element of pipeline are not balanced. Element: " + element + ", output pads: " +
                    element.getPads(Direction.OUT).toString() + ", input pads: " + element.getPads(Direction.IN).toString() + ".");
        }

        // Check OUT element
        {
            Element element = get(OUT);
            int outPadsNumber = element.getPads(Direction.OUT).size();
            int inPadsNumber = element.getPads(Direction.IN).size();
            if ((outPadsNumber | inPadsNumber) > 0 && (outPadsNumber == 0 || inPadsNumber == 0))
                throw new RuntimeException("[ " + this + "] Pads of output element of pipeline are not balanced. Element: " + element + ", output pads: " +
                    element.getPads(Direction.OUT).toString() + ", input pads: " + element.getPads(Direction.IN).toString() + ".");
        }

    }

    @Override
    public void dropLink(String padName) {
        if (elements.get(IN).getLink(padName) != null)
            elements.get(IN).dropLink(padName);

        if (elements.get(OUT).getLink(padName) != null)
            elements.get(OUT).dropLink(padName);
    }

    @Override
    public void dropLink(Link link) {
        elements.get(IN).dropLink(link);
        elements.get(OUT).dropLink(link);
    }

    @Override
    public void replaceLink(Link existingLink, Link newLink) {
        elements.get(IN).replaceLink(existingLink, newLink);
        elements.get(OUT).replaceLink(existingLink, newLink);
    }

    @Override
    public void setLink(String padName, Link link, Direction direction) {
        // Wire links to internal elements instead
        elements.get(direction.toString()).setLink(padName, link, direction);
    }

    @Override
    public void poll(boolean block) {
        throw new RuntimeException("Not implemented.");
    }

    @Override
    public void handleData(ByteBuffer buf, Link link) {
        get(IN).handleData(buf, link);
    }

    @Override
    public void handleEvent(Event event, Direction direction) {
        switch (direction) {
            case IN:
                get(IN).handleEvent(event, direction);
                break;
            case OUT:
                get(OUT).handleEvent(event, direction);
                break;
        }
    }

    @Override
    public void add(Element... elements) {
        for (Element element : elements) {
            String id = element.getId();

            if (this.elements.containsKey(id))
                throw new RuntimeException("This pipeline already contains element with same ID. New element: " + element + ", existing element: " +
                    this.elements.get(id) + ".");

            this.elements.put(id, element);
        }
    }

    @Override
    public void link(String... elementNames) {

        if (elementNames.length < 2)
            throw new RuntimeException("At least two elements are necessary to create link between them.");

        // Parse array of element and pad names

        Element elements[] = new Element[elementNames.length];
        String inputPads[] = new String[elementNames.length];
        String outputPads[] = new String[elementNames.length];

        int i = 0;
        for (String elementName : elementNames) {
            if (elementName.contains("< ")) {
                inputPads[i] = elementName.substring(0, elementName.indexOf("< "));
                elementName = elementName.substring(elementName.indexOf("< ") + 2);
            } else {
                inputPads[i] = STDIN;
            }

            if (elementName.contains(" >")) {
                outputPads[i] = elementName.substring(elementName.indexOf(" >") + 2);
                elementName = elementName.substring(0, elementName.indexOf(" >"));
            } else {
                outputPads[i] = STDOUT;
            }

            elements[i] = get(elementName);

            if (elements[i] == null)
                throw new RuntimeException("Cannot find element by name in this pipeline. Element name: \"" + elementName + "\" (" + elementNames[i] + "), pipeline: " +
                    this + ".");

            i++;
        }

        // Link elements
        for (i = 0; i < elements.length - 1; i++) {
            Element leftElement = elements[i];
            Element rightElement = elements[i + 1];
            String leftPad = outputPads[i];
            String rightPad = inputPads[i + 1];

            String linkId = leftElement.getId() + " >" + leftPad + " | " + rightPad + "< " + rightElement.getId();

            if (verbose)
                System.out.println("[" + this + "] INFO: Linking: " + linkId + ".");

            Link link = new SyncLink(linkId);
            leftElement.setLink(leftPad, link, Direction.OUT);
            rightElement.setLink(rightPad, link, Direction.IN);
        }
    }

    @Override
    public void addAndLink(Element... elements) {
        add(elements);
        link(elements);
    }

    private void link(Element... elements) {
        String elementNames[] = new String[elements.length];

        int i = 0;
        for (Element element : elements) {
            elementNames[i++] = element.getId();
        }

        link(elementNames);
    }

    @Override
    public Element get(String elementName) {
        return elements.get(elementName);
    }

    @Override
    public Link getLink(String elementName, String padName) {
        return elements.get(elementName).getLink(padName);

    }

    @Override
    public void setLink(String elementName, String padName, Link link, Direction direction) {
        elements.get(elementName).setLink(padName, link, direction);
    }

    @Override
    public String getId() {
        return id;
    }

    @Override
    public void runMainLoop(String elementName, String padName, boolean separateThread, boolean waitForStartEvent) {
        validate();

        Link link = getLink(elementName, padName);

        if (link == null)
            throw new NullPointerException("Cannot find link. Element name: " + elementName + ", element: " + get(elementName) + ", pad: " + padName + ".");

        if (!waitForStartEvent)
            link.sendEvent(Event.STREAM_START, Direction.OUT);

        if (separateThread) {
            Thread thread = new Thread(link);
            thread.setDaemon(true);
            thread.start();
        } else {
            link.run();
        }
    }

    @Override
    public String toString() {
        return "Pipeline(" + id + ")";
    }

    /**
     * Example.
     */
    public static void main(String args[]) {
        // System.setProperty("streamer.Link.debug", "true");
        // System.setProperty("streamer.Element.debug", "true");
        // System.setProperty("streamer.Pipeline.debug", "true");

        Pipeline pipeline = new PipelineImpl("main");

        // Create elements
        pipeline.add(new FakeSource("source") {
            {
                this.incommingBufLength = 3;
                this.numBuffers = 10;
                this.delay = 100;
            }
        });
        pipeline.add(new BaseElement("tee"));
        pipeline.add(new FakeSink("sink") {
            {
                this.verbose = true;
            }
        });
        pipeline.add(new FakeSink("sink2") {
            {
                this.verbose = true;
            }
        });

        // Link elements
        pipeline.link("source", "tee", "sink");
        pipeline.link("tee >out2", "sink2");

        // Run main loop
        pipeline.runMainLoop("source", STDOUT, false, false);
    }

}
