/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.falcon.entity.v0;

import org.apache.falcon.entity.AbstractTestBase;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.Load;
import org.apache.falcon.entity.v0.feed.Argument;
import org.apache.falcon.entity.v0.feed.Arguments;
import org.apache.falcon.entity.v0.feed.Clusters;
import org.apache.falcon.entity.v0.feed.ClusterType;
import org.apache.falcon.entity.v0.feed.Extract;
import org.apache.falcon.entity.v0.feed.ExtractMethod;
import org.apache.falcon.entity.v0.feed.FieldsType;
import org.apache.falcon.entity.v0.feed.FieldIncludeExclude;
import org.apache.falcon.entity.v0.feed.Import;
import org.apache.falcon.entity.v0.feed.MergeType;
import org.apache.falcon.entity.v0.feed.Export;
import org.apache.falcon.entity.v0.feed.LoadMethod;


import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.datasource.Datasource;
import org.apache.falcon.entity.v0.process.Input;
import org.apache.falcon.entity.v0.process.Inputs;
import org.apache.falcon.entity.v0.process.Output;
import org.apache.falcon.entity.v0.process.Outputs;
import org.apache.falcon.entity.v0.process.Process;
import org.testng.Assert;
import org.testng.annotations.Test;

import java.util.List;
import java.util.Set;

/**
 * Entity graph tests.
 */
public class EntityGraphTest extends AbstractTestBase {

    private ConfigurationStore store = ConfigurationStore.get();

    private EntityGraph graph = EntityGraph.get();

    @Test
    public void testOnAdd() throws Exception {

        Process process = new Process();
        process.setName("p1");
        Cluster cluster = new Cluster();
        cluster.setName("c1");
        cluster.setColo("1");
        Feed f1 = addInput(process, "f1", cluster);
        Feed f2 = addInput(process, "f2", cluster);
        Feed f3 = addOutput(process, "f3", cluster);
        Feed f4 = addOutput(process, "f4", cluster);
        org.apache.falcon.entity.v0.process.Cluster processCluster = new org.apache.falcon.entity.v0.process.Cluster();
        processCluster.setName("c1");
        process.setClusters(new org.apache.falcon.entity.v0.process.Clusters());
        process.getClusters().getClusters().add(processCluster);

        store.publish(EntityType.CLUSTER, cluster);
        store.publish(EntityType.FEED, f1);
        store.publish(EntityType.FEED, f2);
        store.publish(EntityType.FEED, f3);
        store.publish(EntityType.FEED, f4);
        store.publish(EntityType.PROCESS, process);

        Set<Entity> entities = graph.getDependents(process);
        Assert.assertEquals(entities.size(), 5);
        Assert.assertTrue(entities.contains(cluster));
        Assert.assertTrue(entities.contains(f1));
        Assert.assertTrue(entities.contains(f2));
        Assert.assertTrue(entities.contains(f3));
        Assert.assertTrue(entities.contains(f4));

        entities = graph.getDependents(f1);
        Assert.assertEquals(entities.size(), 2);
        Assert.assertTrue(entities.contains(process));
        Assert.assertTrue(entities.contains(cluster));

        entities = graph.getDependents(f2);
        Assert.assertEquals(entities.size(), 2);
        Assert.assertTrue(entities.contains(process));
        Assert.assertTrue(entities.contains(cluster));

        entities = graph.getDependents(f3);
        Assert.assertEquals(entities.size(), 2);
        Assert.assertTrue(entities.contains(process));
        Assert.assertTrue(entities.contains(cluster));

        entities = graph.getDependents(f4);
        Assert.assertEquals(entities.size(), 2);
        Assert.assertTrue(entities.contains(process));
        Assert.assertTrue(entities.contains(cluster));

        entities = graph.getDependents(cluster);
        Assert.assertEquals(entities.size(), 5);
        Assert.assertTrue(entities.contains(process));
        Assert.assertTrue(entities.contains(f1));
        Assert.assertTrue(entities.contains(f2));
        Assert.assertTrue(entities.contains(f3));
        Assert.assertTrue(entities.contains(f4));
    }

    private Feed addInput(Process process, String feed, Cluster cluster) {
        if (process.getInputs() == null) {
            process.setInputs(new Inputs());
        }
        Inputs inputs = process.getInputs();
        Input input = new Input();
        input.setFeed(feed);
        inputs.getInputs().add(input);
        Feed f1 = new Feed();
        f1.setName(feed);
        Clusters clusters = new Clusters();
        f1.setClusters(clusters);
        org.apache.falcon.entity.v0.feed.Cluster feedCluster =
                new org.apache.falcon.entity.v0.feed.Cluster();
        feedCluster.setName(cluster.getName());
        clusters.getClusters().add(feedCluster);
        return f1;
    }

    private Feed addFeedImport(String feed, Cluster cluster, Datasource ds) {

        Feed f1 = new Feed();
        f1.setName(feed);
        org.apache.falcon.entity.v0.feed.Cluster feedCluster =
                new org.apache.falcon.entity.v0.feed.Cluster();
        feedCluster.setName(cluster.getName());
        feedCluster.setType(ClusterType.SOURCE);
        Clusters clusters = new Clusters();
        clusters.getClusters().add(feedCluster);
        f1.setClusters(clusters);

        Import imp = getAnImport(MergeType.SNAPSHOT, ds);
        f1.getClusters().getClusters().get(0).setImport(imp);
        return f1;
    }

    private Import getAnImport(MergeType mergeType, Datasource ds) {
        Extract extract = new Extract();
        extract.setType(ExtractMethod.FULL);
        extract.setMergepolicy(mergeType);

        FieldsType fields = new FieldsType();
        FieldIncludeExclude fieldInclude = new FieldIncludeExclude();
        fieldInclude.getFields().add("id");
        fieldInclude.getFields().add("name");
        fields.setIncludes(fieldInclude);

        org.apache.falcon.entity.v0.feed.Datasource source = new org.apache.falcon.entity.v0.feed.Datasource();
        source.setName(ds.getName());
        source.setTableName("test-table");
        source.setExtract(extract);
        source.setFields(fields);

        Argument a1 = new Argument();
        a1.setName("--split_by");
        a1.setValue("id");
        Argument a2 = new Argument();
        a2.setName("--num-mappers");
        a2.setValue("2");
        Arguments args = new Arguments();
        List<Argument> argList = args.getArguments();
        argList.add(a1);
        argList.add(a2);

        Import imp = new Import();
        imp.setSource(source);
        imp.setArguments(args);
        return imp;
    }

    private Feed addFeedExport(String feed, Cluster cluster, Datasource ds) {

        Feed f1 = new Feed();
        f1.setName(feed);
        org.apache.falcon.entity.v0.feed.Cluster feedCluster =
                new org.apache.falcon.entity.v0.feed.Cluster();
        feedCluster.setName(cluster.getName());
        feedCluster.setType(ClusterType.SOURCE);
        Clusters clusters = new Clusters();
        clusters.getClusters().add(feedCluster);
        f1.setClusters(clusters);

        Export exp = getAnExport(LoadMethod.UPDATEONLY, ds);
        f1.getClusters().getClusters().get(0).setExport(exp);
        return f1;
    }

    private Export getAnExport(LoadMethod loadMethod, Datasource ds) {

        org.apache.falcon.entity.v0.feed.Datasource target = new org.apache.falcon.entity.v0.feed.Datasource();
        target.setName(ds.getName());
        target.setTableName("test-table");
        Load load = new Load();
        load.setType(loadMethod);
        target.setLoad(load);
        Export exp = new Export();
        exp.setTarget(target);
        return exp;
    }

    private void attachInput(Process process, Feed feed) {
        if (process.getInputs() == null) {
            process.setInputs(new Inputs());
        }
        Inputs inputs = process.getInputs();
        Input input = new Input();
        input.setFeed(feed.getName());
        inputs.getInputs().add(input);
    }

    private Feed addOutput(Process process, String feed, Cluster cluster) {
        if (process.getOutputs() == null) {
            process.setOutputs(new Outputs());
        }
        Outputs outputs = process.getOutputs();
        Output output = new Output();
        output.setFeed(feed);
        outputs.getOutputs().add(output);
        Feed f1 = new Feed();
        f1.setName(feed);
        Clusters clusters = new Clusters();
        f1.setClusters(clusters);
        org.apache.falcon.entity.v0.feed.Cluster feedCluster =
                new org.apache.falcon.entity.v0.feed.Cluster();
        feedCluster.setName(cluster.getName());
        clusters.getClusters().add(feedCluster);
        return f1;
    }

    @Test
    public void testOnRemove() throws Exception {
        Process process = new Process();
        process.setName("rp1");
        Cluster cluster = new Cluster();
        cluster.setName("rc1");
        cluster.setColo("2");
        org.apache.falcon.entity.v0.process.Cluster processCluster = new org.apache.falcon.entity.v0.process.Cluster();
        processCluster.setName("rc1");
        process.setClusters(new org.apache.falcon.entity.v0.process.Clusters());
        process.getClusters().getClusters().add(processCluster);

        store.publish(EntityType.CLUSTER, cluster);
        store.publish(EntityType.PROCESS, process);

        Set<Entity> entities = graph.getDependents(process);
        Assert.assertEquals(entities.size(), 1);
        Assert.assertTrue(entities.contains(cluster));

        entities = graph.getDependents(cluster);
        Assert.assertEquals(entities.size(), 1);
        Assert.assertTrue(entities.contains(process));

        store.remove(EntityType.PROCESS, process.getName());
        entities = graph.getDependents(cluster);
        Assert.assertTrue(entities.isEmpty());

        entities = graph.getDependents(process);
        Assert.assertTrue(entities.isEmpty());
    }

    @Test
    public void testOnRemove2() throws Exception {

        Process p1 = new Process();
        p1.setName("ap1");
        Process p2 = new Process();
        p2.setName("ap2");
        Cluster cluster = new Cluster();
        cluster.setName("ac1");
        cluster.setColo("3");
        Feed f1 = addInput(p1, "af1", cluster);
        Feed f3 = addOutput(p1, "af3", cluster);
        Feed f2 = addOutput(p2, "af2", cluster);
        attachInput(p2, f3);
        org.apache.falcon.entity.v0.process.Cluster processCluster = new org.apache.falcon.entity.v0.process.Cluster();
        processCluster.setName("ac1");
        p1.setClusters(new org.apache.falcon.entity.v0.process.Clusters());
        p1.getClusters().getClusters().add(processCluster);
        processCluster = new org.apache.falcon.entity.v0.process.Cluster();
        processCluster.setName("ac1");
        p2.setClusters(new org.apache.falcon.entity.v0.process.Clusters());
        p2.getClusters().getClusters().add(processCluster);

        store.publish(EntityType.CLUSTER, cluster);
        store.publish(EntityType.FEED, f1);
        store.publish(EntityType.FEED, f2);
        store.publish(EntityType.FEED, f3);
        store.publish(EntityType.PROCESS, p1);
        store.publish(EntityType.PROCESS, p2);

        Set<Entity> entities = graph.getDependents(p1);
        Assert.assertEquals(entities.size(), 3);
        Assert.assertTrue(entities.contains(cluster));
        Assert.assertTrue(entities.contains(f1));
        Assert.assertTrue(entities.contains(f3));

        entities = graph.getDependents(p2);
        Assert.assertEquals(entities.size(), 3);
        Assert.assertTrue(entities.contains(cluster));
        Assert.assertTrue(entities.contains(f2));
        Assert.assertTrue(entities.contains(f3));

        entities = graph.getDependents(f1);
        Assert.assertEquals(entities.size(), 2);
        Assert.assertTrue(entities.contains(p1));
        Assert.assertTrue(entities.contains(cluster));

        entities = graph.getDependents(f2);
        Assert.assertEquals(entities.size(), 2);
        Assert.assertTrue(entities.contains(p2));
        Assert.assertTrue(entities.contains(cluster));

        entities = graph.getDependents(f3);
        Assert.assertEquals(entities.size(), 3);
        Assert.assertTrue(entities.contains(p2));
        Assert.assertTrue(entities.contains(p1));
        Assert.assertTrue(entities.contains(cluster));

        entities = graph.getDependents(cluster);
        Assert.assertEquals(entities.size(), 5);
        Assert.assertTrue(entities.contains(p1));
        Assert.assertTrue(entities.contains(p2));
        Assert.assertTrue(entities.contains(f1));
        Assert.assertTrue(entities.contains(f2));
        Assert.assertTrue(entities.contains(f3));

        store.remove(EntityType.PROCESS, p2.getName());
        store.remove(EntityType.FEED, f2.getName());

        entities = graph.getDependents(p1);
        Assert.assertEquals(entities.size(), 3);
        Assert.assertTrue(entities.contains(cluster));
        Assert.assertTrue(entities.contains(f1));
        Assert.assertTrue(entities.contains(f3));

        entities = graph.getDependents(p2);
        Assert.assertTrue(entities.isEmpty());

        entities = graph.getDependents(f1);
        Assert.assertEquals(entities.size(), 2);
        Assert.assertTrue(entities.contains(p1));
        Assert.assertTrue(entities.contains(cluster));

        entities = graph.getDependents(f2);
        Assert.assertTrue(entities.isEmpty());

        entities = graph.getDependents(f3);
        Assert.assertEquals(entities.size(), 2);
        Assert.assertTrue(entities.contains(p1));
        Assert.assertTrue(entities.contains(cluster));

        entities = graph.getDependents(cluster);
        Assert.assertEquals(entities.size(), 3);
        Assert.assertTrue(entities.contains(p1));
        Assert.assertTrue(entities.contains(f1));
        Assert.assertTrue(entities.contains(f3));
    }

    @Test
    public void testOnChange() throws Exception {
    }

    @Test
    public void testOnAddImport() throws Exception {

        Datasource ds = new Datasource();
        ds.setName("test-db");
        ds.setColo("c1");

        Cluster cluster = new Cluster();
        cluster.setName("ci1");
        cluster.setColo("c1");

        Feed f1 = addFeedImport("fi1", cluster, ds);

        store.publish(EntityType.CLUSTER, cluster);
        store.publish(EntityType.DATASOURCE, ds);
        store.publish(EntityType.FEED, f1);

        Set<Entity> entities = graph.getDependents(cluster);
        Assert.assertEquals(entities.size(), 1);
        Assert.assertTrue(entities.contains(f1));

        entities = graph.getDependents(ds);
        Assert.assertEquals(entities.size(), 1);
        Assert.assertTrue(entities.contains(f1));

        entities = graph.getDependents(f1);
        Assert.assertEquals(entities.size(), 2);
        Assert.assertTrue(entities.contains(cluster));
        Assert.assertTrue(entities.contains(ds));

        store.remove(EntityType.FEED, "fi1");
        store.remove(EntityType.DATASOURCE, "test-db");
        store.remove(EntityType.CLUSTER, "ci1");
    }

    @Test
    public void testOnAddExport() throws Exception {

        Datasource ds = new Datasource();
        ds.setName("test-db");
        ds.setColo("c1");

        Cluster cluster = new Cluster();
        cluster.setName("ci1");
        cluster.setColo("c1");

        Feed f1 = addFeedExport("fe1", cluster, ds);

        store.publish(EntityType.CLUSTER, cluster);
        store.publish(EntityType.DATASOURCE, ds);
        store.publish(EntityType.FEED, f1);

        Set<Entity> entities = graph.getDependents(cluster);
        Assert.assertEquals(entities.size(), 1);
        Assert.assertTrue(entities.contains(f1));

        entities = graph.getDependents(ds);
        Assert.assertEquals(entities.size(), 1);
        Assert.assertTrue(entities.contains(f1));

        entities = graph.getDependents(f1);
        Assert.assertEquals(entities.size(), 2);
        Assert.assertTrue(entities.contains(cluster));
        Assert.assertTrue(entities.contains(ds));

        store.remove(EntityType.FEED, "fe1");
        store.remove(EntityType.DATASOURCE, "test-db");
        store.remove(EntityType.CLUSTER, "ci1");
    }


    @Test
    public void testOnRemoveDatasource() throws Exception {

        Datasource ds = new Datasource();
        ds.setName("test-db");
        ds.setColo("c1");

        Cluster cluster = new Cluster();
        cluster.setName("ci1");
        cluster.setColo("c1");

        Feed f1 = addFeedImport("fi1", cluster, ds);

        store.publish(EntityType.CLUSTER, cluster);
        store.publish(EntityType.DATASOURCE, ds);
        store.publish(EntityType.FEED, f1);

        store.remove(EntityType.DATASOURCE, "test-db");

        Set<Entity> entities = graph.getDependents(f1);
        Assert.assertEquals(1, entities.size());
        Assert.assertTrue(entities.contains(cluster));
    }
}
