blob: 7cdd6c71ac7104b2ec33b3f2b00fa61fad8a94fd [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.entity.v0;
import org.apache.falcon.entity.AbstractTestBase;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.Load;
import org.apache.falcon.entity.v0.feed.Argument;
import org.apache.falcon.entity.v0.feed.Arguments;
import org.apache.falcon.entity.v0.feed.Clusters;
import org.apache.falcon.entity.v0.feed.ClusterType;
import org.apache.falcon.entity.v0.feed.Extract;
import org.apache.falcon.entity.v0.feed.ExtractMethod;
import org.apache.falcon.entity.v0.feed.FieldsType;
import org.apache.falcon.entity.v0.feed.FieldIncludeExclude;
import org.apache.falcon.entity.v0.feed.Import;
import org.apache.falcon.entity.v0.feed.MergeType;
import org.apache.falcon.entity.v0.feed.Export;
import org.apache.falcon.entity.v0.feed.LoadMethod;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.datasource.Datasource;
import org.apache.falcon.entity.v0.process.Input;
import org.apache.falcon.entity.v0.process.Inputs;
import org.apache.falcon.entity.v0.process.Output;
import org.apache.falcon.entity.v0.process.Outputs;
import org.apache.falcon.entity.v0.process.Process;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.List;
import java.util.Set;
import org.apache.falcon.util.StartupProperties;
import org.apache.hadoop.conf.Configuration;
import org.testng.annotations.AfterSuite;
import org.testng.annotations.BeforeSuite;
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
/**
* Entity graph tests.
*/
public class EntityGraphTest extends AbstractTestBase {
private ConfigurationStore store = ConfigurationStore.get();
private EntityGraph graph = EntityGraph.get();
@Test
public void testOnAdd() throws Exception {
Process process = new Process();
process.setName("p1");
Cluster cluster = new Cluster();
cluster.setName("c1");
cluster.setColo("1");
Feed f1 = addInput(process, "f1", cluster);
Feed f2 = addInput(process, "f2", cluster);
Feed f3 = addOutput(process, "f3", cluster);
Feed f4 = addOutput(process, "f4", cluster);
org.apache.falcon.entity.v0.process.Cluster processCluster = new org.apache.falcon.entity.v0.process.Cluster();
processCluster.setName("c1");
process.setClusters(new org.apache.falcon.entity.v0.process.Clusters());
process.getClusters().getClusters().add(processCluster);
store.publish(EntityType.CLUSTER, cluster);
store.publish(EntityType.FEED, f1);
store.publish(EntityType.FEED, f2);
store.publish(EntityType.FEED, f3);
store.publish(EntityType.FEED, f4);
store.publish(EntityType.PROCESS, process);
Set<Entity> entities = graph.getDependents(process);
Assert.assertEquals(entities.size(), 5);
Assert.assertTrue(entities.contains(cluster));
Assert.assertTrue(entities.contains(f1));
Assert.assertTrue(entities.contains(f2));
Assert.assertTrue(entities.contains(f3));
Assert.assertTrue(entities.contains(f4));
entities = graph.getDependents(f1);
Assert.assertEquals(entities.size(), 2);
Assert.assertTrue(entities.contains(process));
Assert.assertTrue(entities.contains(cluster));
entities = graph.getDependents(f2);
Assert.assertEquals(entities.size(), 2);
Assert.assertTrue(entities.contains(process));
Assert.assertTrue(entities.contains(cluster));
entities = graph.getDependents(f3);
Assert.assertEquals(entities.size(), 2);
Assert.assertTrue(entities.contains(process));
Assert.assertTrue(entities.contains(cluster));
entities = graph.getDependents(f4);
Assert.assertEquals(entities.size(), 2);
Assert.assertTrue(entities.contains(process));
Assert.assertTrue(entities.contains(cluster));
entities = graph.getDependents(cluster);
Assert.assertEquals(entities.size(), 5);
Assert.assertTrue(entities.contains(process));
Assert.assertTrue(entities.contains(f1));
Assert.assertTrue(entities.contains(f2));
Assert.assertTrue(entities.contains(f3));
Assert.assertTrue(entities.contains(f4));
}
private Feed addInput(Process process, String feed, Cluster cluster) {
if (process.getInputs() == null) {
process.setInputs(new Inputs());
}
Inputs inputs = process.getInputs();
Input input = new Input();
input.setFeed(feed);
inputs.getInputs().add(input);
Feed f1 = new Feed();
f1.setName(feed);
Clusters clusters = new Clusters();
f1.setClusters(clusters);
org.apache.falcon.entity.v0.feed.Cluster feedCluster =
new org.apache.falcon.entity.v0.feed.Cluster();
feedCluster.setName(cluster.getName());
clusters.getClusters().add(feedCluster);
return f1;
}
private Feed addFeedImport(String feed, Cluster cluster, Datasource ds) {
Feed f1 = new Feed();
f1.setName(feed);
org.apache.falcon.entity.v0.feed.Cluster feedCluster =
new org.apache.falcon.entity.v0.feed.Cluster();
feedCluster.setName(cluster.getName());
feedCluster.setType(ClusterType.SOURCE);
Clusters clusters = new Clusters();
clusters.getClusters().add(feedCluster);
f1.setClusters(clusters);
Import imp = getAnImport(MergeType.SNAPSHOT, ds);
f1.getClusters().getClusters().get(0).setImport(imp);
return f1;
}
private Import getAnImport(MergeType mergeType, Datasource ds) {
Extract extract = new Extract();
extract.setType(ExtractMethod.FULL);
extract.setMergepolicy(mergeType);
FieldsType fields = new FieldsType();
FieldIncludeExclude fieldInclude = new FieldIncludeExclude();
fieldInclude.getFields().add("id");
fieldInclude.getFields().add("name");
fields.setIncludes(fieldInclude);
org.apache.falcon.entity.v0.feed.Datasource source = new org.apache.falcon.entity.v0.feed.Datasource();
source.setName(ds.getName());
source.setTableName("test-table");
source.setExtract(extract);
source.setFields(fields);
Argument a1 = new Argument();
a1.setName("--split_by");
a1.setValue("id");
Argument a2 = new Argument();
a2.setName("--num-mappers");
a2.setValue("2");
Arguments args = new Arguments();
List<Argument> argList = args.getArguments();
argList.add(a1);
argList.add(a2);
Import imp = new Import();
imp.setSource(source);
imp.setArguments(args);
return imp;
}
private Feed addFeedExport(String feed, Cluster cluster, Datasource ds) {
Feed f1 = new Feed();
f1.setName(feed);
org.apache.falcon.entity.v0.feed.Cluster feedCluster =
new org.apache.falcon.entity.v0.feed.Cluster();
feedCluster.setName(cluster.getName());
feedCluster.setType(ClusterType.SOURCE);
Clusters clusters = new Clusters();
clusters.getClusters().add(feedCluster);
f1.setClusters(clusters);
Export exp = getAnExport(LoadMethod.UPDATEONLY, ds);
f1.getClusters().getClusters().get(0).setExport(exp);
return f1;
}
private Export getAnExport(LoadMethod loadMethod, Datasource ds) {
org.apache.falcon.entity.v0.feed.Datasource target = new org.apache.falcon.entity.v0.feed.Datasource();
target.setName(ds.getName());
target.setTableName("test-table");
Load load = new Load();
load.setType(loadMethod);
target.setLoad(load);
Export exp = new Export();
exp.setTarget(target);
return exp;
}
private void attachInput(Process process, Feed feed) {
if (process.getInputs() == null) {
process.setInputs(new Inputs());
}
Inputs inputs = process.getInputs();
Input input = new Input();
input.setFeed(feed.getName());
inputs.getInputs().add(input);
}
private Feed addOutput(Process process, String feed, Cluster cluster) {
if (process.getOutputs() == null) {
process.setOutputs(new Outputs());
}
Outputs outputs = process.getOutputs();
Output output = new Output();
output.setFeed(feed);
outputs.getOutputs().add(output);
Feed f1 = new Feed();
f1.setName(feed);
Clusters clusters = new Clusters();
f1.setClusters(clusters);
org.apache.falcon.entity.v0.feed.Cluster feedCluster =
new org.apache.falcon.entity.v0.feed.Cluster();
feedCluster.setName(cluster.getName());
clusters.getClusters().add(feedCluster);
return f1;
}
@Test
public void testOnRemove() throws Exception {
Process process = new Process();
process.setName("rp1");
Cluster cluster = new Cluster();
cluster.setName("rc1");
cluster.setColo("2");
org.apache.falcon.entity.v0.process.Cluster processCluster = new org.apache.falcon.entity.v0.process.Cluster();
processCluster.setName("rc1");
process.setClusters(new org.apache.falcon.entity.v0.process.Clusters());
process.getClusters().getClusters().add(processCluster);
store.publish(EntityType.CLUSTER, cluster);
store.publish(EntityType.PROCESS, process);
Set<Entity> entities = graph.getDependents(process);
Assert.assertEquals(entities.size(), 1);
Assert.assertTrue(entities.contains(cluster));
entities = graph.getDependents(cluster);
Assert.assertEquals(entities.size(), 1);
Assert.assertTrue(entities.contains(process));
store.remove(EntityType.PROCESS, process.getName());
entities = graph.getDependents(cluster);
Assert.assertTrue(entities.isEmpty());
entities = graph.getDependents(process);
Assert.assertTrue(entities.isEmpty());
}
@Test
public void testOnRemove2() throws Exception {
Process p1 = new Process();
p1.setName("ap1");
Process p2 = new Process();
p2.setName("ap2");
Cluster cluster = new Cluster();
cluster.setName("ac1");
cluster.setColo("3");
Feed f1 = addInput(p1, "af1", cluster);
Feed f3 = addOutput(p1, "af3", cluster);
Feed f2 = addOutput(p2, "af2", cluster);
attachInput(p2, f3);
org.apache.falcon.entity.v0.process.Cluster processCluster = new org.apache.falcon.entity.v0.process.Cluster();
processCluster.setName("ac1");
p1.setClusters(new org.apache.falcon.entity.v0.process.Clusters());
p1.getClusters().getClusters().add(processCluster);
processCluster = new org.apache.falcon.entity.v0.process.Cluster();
processCluster.setName("ac1");
p2.setClusters(new org.apache.falcon.entity.v0.process.Clusters());
p2.getClusters().getClusters().add(processCluster);
store.publish(EntityType.CLUSTER, cluster);
store.publish(EntityType.FEED, f1);
store.publish(EntityType.FEED, f2);
store.publish(EntityType.FEED, f3);
store.publish(EntityType.PROCESS, p1);
store.publish(EntityType.PROCESS, p2);
Set<Entity> entities = graph.getDependents(p1);
Assert.assertEquals(entities.size(), 3);
Assert.assertTrue(entities.contains(cluster));
Assert.assertTrue(entities.contains(f1));
Assert.assertTrue(entities.contains(f3));
entities = graph.getDependents(p2);
Assert.assertEquals(entities.size(), 3);
Assert.assertTrue(entities.contains(cluster));
Assert.assertTrue(entities.contains(f2));
Assert.assertTrue(entities.contains(f3));
entities = graph.getDependents(f1);
Assert.assertEquals(entities.size(), 2);
Assert.assertTrue(entities.contains(p1));
Assert.assertTrue(entities.contains(cluster));
entities = graph.getDependents(f2);
Assert.assertEquals(entities.size(), 2);
Assert.assertTrue(entities.contains(p2));
Assert.assertTrue(entities.contains(cluster));
entities = graph.getDependents(f3);
Assert.assertEquals(entities.size(), 3);
Assert.assertTrue(entities.contains(p2));
Assert.assertTrue(entities.contains(p1));
Assert.assertTrue(entities.contains(cluster));
entities = graph.getDependents(cluster);
Assert.assertEquals(entities.size(), 5);
Assert.assertTrue(entities.contains(p1));
Assert.assertTrue(entities.contains(p2));
Assert.assertTrue(entities.contains(f1));
Assert.assertTrue(entities.contains(f2));
Assert.assertTrue(entities.contains(f3));
store.remove(EntityType.PROCESS, p2.getName());
store.remove(EntityType.FEED, f2.getName());
entities = graph.getDependents(p1);
Assert.assertEquals(entities.size(), 3);
Assert.assertTrue(entities.contains(cluster));
Assert.assertTrue(entities.contains(f1));
Assert.assertTrue(entities.contains(f3));
entities = graph.getDependents(p2);
Assert.assertTrue(entities.isEmpty());
entities = graph.getDependents(f1);
Assert.assertEquals(entities.size(), 2);
Assert.assertTrue(entities.contains(p1));
Assert.assertTrue(entities.contains(cluster));
entities = graph.getDependents(f2);
Assert.assertTrue(entities.isEmpty());
entities = graph.getDependents(f3);
Assert.assertEquals(entities.size(), 2);
Assert.assertTrue(entities.contains(p1));
Assert.assertTrue(entities.contains(cluster));
entities = graph.getDependents(cluster);
Assert.assertEquals(entities.size(), 3);
Assert.assertTrue(entities.contains(p1));
Assert.assertTrue(entities.contains(f1));
Assert.assertTrue(entities.contains(f3));
}
@Test
public void testOnChange() throws Exception {
}
@Test
public void testOnAddImport() throws Exception {
Datasource ds = new Datasource();
ds.setName("test-db");
ds.setColo("c1");
Cluster cluster = new Cluster();
cluster.setName("ci1");
cluster.setColo("c1");
Feed f1 = addFeedImport("fi1", cluster, ds);
store.publish(EntityType.CLUSTER, cluster);
store.publish(EntityType.DATASOURCE, ds);
store.publish(EntityType.FEED, f1);
Set<Entity> entities = graph.getDependents(cluster);
Assert.assertEquals(entities.size(), 1);
Assert.assertTrue(entities.contains(f1));
entities = graph.getDependents(ds);
Assert.assertEquals(entities.size(), 1);
Assert.assertTrue(entities.contains(f1));
entities = graph.getDependents(f1);
Assert.assertEquals(entities.size(), 2);
Assert.assertTrue(entities.contains(cluster));
Assert.assertTrue(entities.contains(ds));
store.remove(EntityType.FEED, "fi1");
store.remove(EntityType.DATASOURCE, "test-db");
store.remove(EntityType.CLUSTER, "ci1");
}
@Test
public void testOnAddExport() throws Exception {
Datasource ds = new Datasource();
ds.setName("test-db");
ds.setColo("c1");
Cluster cluster = new Cluster();
cluster.setName("ci1");
cluster.setColo("c1");
Feed f1 = addFeedExport("fe1", cluster, ds);
store.publish(EntityType.CLUSTER, cluster);
store.publish(EntityType.DATASOURCE, ds);
store.publish(EntityType.FEED, f1);
Set<Entity> entities = graph.getDependents(cluster);
Assert.assertEquals(entities.size(), 1);
Assert.assertTrue(entities.contains(f1));
entities = graph.getDependents(ds);
Assert.assertEquals(entities.size(), 1);
Assert.assertTrue(entities.contains(f1));
entities = graph.getDependents(f1);
Assert.assertEquals(entities.size(), 2);
Assert.assertTrue(entities.contains(cluster));
Assert.assertTrue(entities.contains(ds));
store.remove(EntityType.FEED, "fe1");
store.remove(EntityType.DATASOURCE, "test-db");
store.remove(EntityType.CLUSTER, "ci1");
}
@Test
public void testOnRemoveDatasource() throws Exception {
Datasource ds = new Datasource();
ds.setName("test-db");
ds.setColo("c1");
Cluster cluster = new Cluster();
cluster.setName("ci1");
cluster.setColo("c1");
Feed f1 = addFeedImport("fi1", cluster, ds);
store.publish(EntityType.CLUSTER, cluster);
store.publish(EntityType.DATASOURCE, ds);
store.publish(EntityType.FEED, f1);
store.remove(EntityType.DATASOURCE, "test-db");
Set<Entity> entities = graph.getDependents(f1);
Assert.assertEquals(1, entities.size());
Assert.assertTrue(entities.contains(cluster));
}
@BeforeSuite
@AfterSuite
public void cleanup() throws IOException {
Path path = new Path(StartupProperties.get().
getProperty("config.store.uri"));
FileSystem fs = FileSystem.get(path.toUri(), new Configuration());
fs.delete(path, true);
System.out.println("Cleaned up " + path);
}
}