blob: 151211efa00fb30cab59c3fa16b059b5099d2163 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.tagpropagation;
import org.apache.atlas.RequestContextV1;
import org.apache.atlas.TestModules;
import org.apache.atlas.discovery.AtlasLineageService;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasRelationship;
import org.apache.atlas.model.lineage.AtlasLineageInfo;
import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageRelation;
import org.apache.atlas.model.typedef.AtlasClassificationDef;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.repository.impexp.ImportService;
import org.apache.atlas.repository.impexp.ZipFileResourceTestUtils;
import org.apache.atlas.repository.impexp.ZipSource;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
import org.apache.atlas.runner.LocalSolrRunner;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.collections.CollectionUtils;
import org.testng.SkipException;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import javax.inject.Inject;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.atlas.graph.GraphSandboxUtil.useLocalSolr;
import static org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection;
import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.BOTH;
import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.NONE;
import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.ONE_TO_TWO;
import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.TWO_TO_ONE;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadModelFromJson;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithNoParameters;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@Guice(modules = TestModules.TestOnlyModule.class)
public class ClassificationPropagationTest {
public static final String HDFS_PATH_EMPLOYEES = "HDFS_PATH_EMPLOYEES";
public static final String EMPLOYEES1_TABLE = "EMPLOYEES1_TABLE";
public static final String EMPLOYEES2_TABLE = "EMPLOYEES2_TABLE";
public static final String EMPLOYEES_UNION_TABLE = "EMPLOYEES_UNION_TABLE";
public static final String EMPLOYEES1_PROCESS = "EMPLOYEES1_PROCESS";
public static final String EMPLOYEES2_PROCESS = "EMPLOYEES2_PROCESS";
public static final String EMPLOYEES_UNION_PROCESS = "EMPLOYEES_UNION_PROCESS";
@Inject
private AtlasTypeDefStore typeDefStore;
@Inject
private AtlasTypeRegistry typeRegistry;
@Inject
private AtlasEntityStore entityStore;
@Inject
private AtlasRelationshipStore relationshipStore;
@Inject
private ImportService importService;
@Inject
private AtlasLineageService lineageService;
private Map<String, String> entitiesMap;
private AtlasLineageInfo lineageInfo;
@BeforeClass
public void setup() {
RequestContextV1.clear();
loadModelFilesAndImportTestData();
}
@AfterClass
public void clear() throws Exception {
AtlasGraphProvider.cleanup();
if (useLocalSolr()) {
LocalSolrRunner.stop();
}
}
/** This test uses the lineage graph:
*
[Process1] ----> [Employees1]
/ \
/ \
[hdfs_employees] [Process3] ----> [ EmployeesUnion ]
\ /
\ /
[Process2] ----> [Employees2]
*/
@Test
public void addClassification_PropagateFalse() throws AtlasBaseException {
AtlasEntity hdfs_employees = getEntity(HDFS_PATH_EMPLOYEES);
AtlasClassification tag2 = new AtlasClassification("tag2"); tag2.setPropagate(false); tag2.setEntityGuid(hdfs_employees.getGuid());
// add classification with propagate to 'false'
addClassification(hdfs_employees, tag2);
List<String> propagatedToEntities = Arrays.asList(EMPLOYEES1_PROCESS, EMPLOYEES2_PROCESS, EMPLOYEES1_TABLE,
EMPLOYEES2_TABLE, EMPLOYEES_UNION_PROCESS, EMPLOYEES_UNION_TABLE);
assertClassificationNotExistInEntities(propagatedToEntities, tag2);
}
@Test(dependsOnMethods = {"addClassification_PropagateFalse"})
public void updateClassification_PropagateFalseToTrue() throws AtlasBaseException {
AtlasEntity hdfs_employees = getEntity(HDFS_PATH_EMPLOYEES);
AtlasClassification tag2 = new AtlasClassification("tag2"); tag2.setEntityGuid(hdfs_employees.getGuid());
//update tag2 propagate to 'true'
tag2 = getClassification(hdfs_employees, tag2); tag2.setPropagate(true);
updateClassifications(hdfs_employees, tag2);
List<String> propagatedToEntities = Arrays.asList(EMPLOYEES1_PROCESS, EMPLOYEES2_PROCESS, EMPLOYEES1_TABLE,
EMPLOYEES2_TABLE, EMPLOYEES_UNION_PROCESS, EMPLOYEES_UNION_TABLE);
assertClassificationExistInEntities(propagatedToEntities, tag2);
deleteClassification(hdfs_employees, tag2);
}
@Test(dependsOnMethods = {"updateClassification_PropagateFalseToTrue"})
public void addClassification_PropagateTrue() throws AtlasBaseException {
AtlasEntity hdfs_employees = getEntity(HDFS_PATH_EMPLOYEES);
AtlasClassification tag1 = new AtlasClassification("tag1"); tag1.setPropagate(true); tag1.setEntityGuid(hdfs_employees.getGuid());
// add classification with propagate flag to 'true'
addClassification(hdfs_employees, tag1);
List<String> propagatedToEntities = Arrays.asList(EMPLOYEES1_PROCESS, EMPLOYEES2_PROCESS, EMPLOYEES1_TABLE,
EMPLOYEES2_TABLE, EMPLOYEES_UNION_PROCESS, EMPLOYEES_UNION_TABLE);
assertClassificationExistInEntities(propagatedToEntities, tag1);
}
@Test(dependsOnMethods = {"addClassification_PropagateTrue"})
public void updateClassification_PropagateTrueToFalse() throws AtlasBaseException {
AtlasEntity hdfs_employees = getEntity(HDFS_PATH_EMPLOYEES);
AtlasClassification tag1 = new AtlasClassification("tag1"); tag1.setEntityGuid(hdfs_employees.getGuid());
List<String> propagatedToEntities = Arrays.asList(EMPLOYEES1_PROCESS, EMPLOYEES2_PROCESS, EMPLOYEES1_TABLE,
EMPLOYEES2_TABLE, EMPLOYEES_UNION_PROCESS, EMPLOYEES_UNION_TABLE);
// update propagate flag to 'false'
tag1 = getClassification(hdfs_employees, tag1); tag1.setPropagate(false);
updateClassifications(hdfs_employees, tag1);
assertClassificationNotExistInEntities(propagatedToEntities, tag1);
}
@Test(dependsOnMethods = {"updateClassification_PropagateTrueToFalse"})
public void deleteClassification_PropagateTrue() throws AtlasBaseException {
AtlasEntity hdfs_employees = getEntity(HDFS_PATH_EMPLOYEES);
AtlasClassification tag1 = new AtlasClassification("tag1"); tag1.setPropagate(true); tag1.setEntityGuid(hdfs_employees.getGuid());
deleteClassification(hdfs_employees, tag1);
List<String> propagatedToEntities = Arrays.asList(EMPLOYEES1_PROCESS, EMPLOYEES2_PROCESS, EMPLOYEES1_TABLE,
EMPLOYEES2_TABLE, EMPLOYEES_UNION_PROCESS, EMPLOYEES_UNION_TABLE);
assertClassificationNotExistInEntities(propagatedToEntities, tag1);
}
@Test(dependsOnMethods = {"deleteClassification_PropagateTrue"})
public void propagateSameTagFromDifferentEntities() throws AtlasBaseException {
// add tag1 to hdfs_employees
AtlasEntity hdfs_employees = getEntity(HDFS_PATH_EMPLOYEES);
AtlasClassification tag1 = new AtlasClassification("tag1");
tag1.setPropagate(true);
tag1.setEntityGuid(hdfs_employees.getGuid());
addClassification(hdfs_employees, tag1);
// add tag1 to employees2
AtlasEntity employees2_table = getEntity(EMPLOYEES2_TABLE);
tag1 = new AtlasClassification("tag1");
tag1.setPropagate(true);
tag1.setEntityGuid(employees2_table.getGuid());
addClassification(employees2_table, tag1);
// employees_union table should have two tags 'tag1' propagated from hdfs_employees and employees2 table
AtlasEntity employees_union_table = getEntity(EMPLOYEES_UNION_TABLE);
List<AtlasClassification> classifications = employees_union_table.getClassifications();
assertNotNull(classifications);
assertEquals(classifications.size(), 2);
// assert same tag propagated from hdfs_employees and employees2
assertEquals(classifications.get(0).getTypeName(), tag1.getTypeName());
assertEquals(classifications.get(1).getTypeName(), tag1.getTypeName());
if (classifications.get(0).getEntityGuid().equals(hdfs_employees.getGuid())) {
assertEquals(classifications.get(1).getEntityGuid(), employees2_table.getGuid());
}
if (classifications.get(0).getEntityGuid().equals(employees2_table.getGuid())) {
assertEquals(classifications.get(1).getEntityGuid(), hdfs_employees.getGuid());
}
// cleanup
deleteClassification(hdfs_employees, tag1);
deleteClassification(employees2_table, tag1);
}
@Test(dependsOnMethods = {"propagateSameTagFromDifferentEntities"})
public void updatePropagateTagsValue() throws AtlasBaseException {
AtlasEntity hdfs_employees = getEntity(HDFS_PATH_EMPLOYEES);
AtlasEntity employees2_table = getEntity(EMPLOYEES2_TABLE);
AtlasEntity employees_union_process = getEntity(EMPLOYEES_UNION_PROCESS);
AtlasEntity employees_union_table = getEntity(EMPLOYEES_UNION_TABLE);
AtlasClassification tag1 = new AtlasClassification("tag1"); tag1.setPropagate(true); tag1.setEntityGuid(hdfs_employees.getGuid());
AtlasClassification tag2 = new AtlasClassification("tag2"); tag1.setPropagate(true); tag2.setEntityGuid(employees2_table.getGuid());
AtlasClassification tag3 = new AtlasClassification("tag3"); tag1.setPropagate(true); tag3.setEntityGuid(employees_union_process.getGuid());
AtlasClassification tag4 = new AtlasClassification("tag4"); tag1.setPropagate(true); tag4.setEntityGuid(employees_union_table.getGuid());
// add tag1 to hdfs_employees, tag2 to employees2, tag3 to process3, tag4 to employees_union
addClassification(hdfs_employees, tag1);
addClassification(employees2_table, tag2);
addClassification(employees_union_process, tag3);
addClassification(employees_union_table, tag4);
//validate if tag1, tag2, tag3 propagated to employees_union table
assertClassificationExistInEntity(EMPLOYEES_UNION_TABLE, tag1);
assertClassificationExistInEntity(EMPLOYEES_UNION_TABLE, tag2);
assertClassificationExistInEntity(EMPLOYEES_UNION_TABLE, tag3);
assertClassificationExistInEntity(EMPLOYEES_UNION_TABLE, tag4);
// change propagation between employees2 -> process3 from TWO_TO_ONE to NONE
AtlasRelationship employees2_process_relationship = getRelationship(EMPLOYEES2_TABLE, EMPLOYEES_UNION_PROCESS);
assertEquals(employees2_process_relationship.getPropagateTags(), TWO_TO_ONE);
employees2_process_relationship.setPropagateTags(NONE);
relationshipStore.update(employees2_process_relationship);
// validate tag1 propagated to employees_union through other path
assertClassificationExistInEntity(EMPLOYEES_UNION_TABLE, tag1);
// validate tag2 is no more propagated to employees_union
assertClassificationNotExistInEntity(EMPLOYEES_UNION_TABLE, tag2);
// change propagation between employees2 -> process3 from NONE to TWO_TO_ONE
employees2_process_relationship = getRelationship(EMPLOYEES2_TABLE, EMPLOYEES_UNION_PROCESS);
assertEquals(employees2_process_relationship.getPropagateTags(), NONE);
employees2_process_relationship.setPropagateTags(TWO_TO_ONE);
relationshipStore.update(employees2_process_relationship);
// validate tag2 is propagated to employees_union
assertClassificationExistInEntity(EMPLOYEES_UNION_TABLE, tag2);
//update propagation to BOTH for edge process3 --> employee_union
AtlasRelationship process3_employee_union_relationship = getRelationship(EMPLOYEES_UNION_PROCESS, EMPLOYEES_UNION_TABLE);
assertEquals(process3_employee_union_relationship.getPropagateTags(), ONE_TO_TWO);
process3_employee_union_relationship.setPropagateTags(BOTH);
relationshipStore.update(process3_employee_union_relationship);
// process3 should get 'tag4' from employee_union and employee_union should get tag3 from process3 (BOTH)
assertClassificationExistInEntity(EMPLOYEES_UNION_PROCESS, tag4);
assertClassificationExistInEntity(EMPLOYEES_UNION_TABLE, tag3);
//update propagation to ONE_TO_TWO for edge process3 --> employee_union
process3_employee_union_relationship.setPropagateTags(ONE_TO_TWO);
relationshipStore.update(process3_employee_union_relationship);
assertClassificationNotExistInEntity(EMPLOYEES_UNION_PROCESS, tag4);
//cleanup
deleteClassification(hdfs_employees, tag1);
deleteClassification(employees2_table, tag2);
deleteClassification(employees_union_process, tag3);
deleteClassification(employees_union_table, tag4);
}
@Test(dependsOnMethods = {"updatePropagateTagsValue"})
public void addBlockedPropagatedClassifications() throws AtlasBaseException {
AtlasEntity hdfs_path = getEntity(HDFS_PATH_EMPLOYEES);
AtlasEntity employees1 = getEntity(EMPLOYEES1_TABLE);
AtlasEntity employees2 = getEntity(EMPLOYEES2_TABLE);
AtlasEntity employees_union = getEntity(EMPLOYEES_UNION_TABLE);
AtlasClassification PII_tag1 = new AtlasClassification("PII"); PII_tag1.setPropagate(true);
PII_tag1.setAttribute("type", "from hdfs_path entity");
PII_tag1.setAttribute("valid", true);
AtlasClassification PII_tag2 = new AtlasClassification("PII"); PII_tag2.setPropagate(true);
PII_tag2.setAttribute("type", "from employees1 entity");
PII_tag2.setAttribute("valid", true);
AtlasClassification PII_tag3 = new AtlasClassification("PII"); PII_tag3.setPropagate(true);
PII_tag3.setAttribute("type", "from employees2 entity");
PII_tag3.setAttribute("valid", true);
AtlasClassification PII_tag4 = new AtlasClassification("PII"); PII_tag4.setPropagate(true);
PII_tag4.setAttribute("type", "from employees_union entity");
PII_tag4.setAttribute("valid", true);
// add PII to hdfs_path, employees1, employees2 and employee_union
addClassification(hdfs_path, PII_tag1);
addClassification(employees1, PII_tag2);
addClassification(employees2, PII_tag3);
// check 4 PII tags exists in employee_union table
assertClassificationExistInEntity(EMPLOYEES_UNION_TABLE, PII_tag1.getTypeName(), hdfs_path.getGuid());
assertClassificationExistInEntity(EMPLOYEES_UNION_TABLE, PII_tag2.getTypeName(), employees1.getGuid());
assertClassificationExistInEntity(EMPLOYEES_UNION_TABLE, PII_tag3.getTypeName(), employees2.getGuid());
AtlasRelationship process3_employee_union_relationship = getRelationship(EMPLOYEES_UNION_PROCESS, EMPLOYEES_UNION_TABLE);
Set<AtlasClassification> propagatedClassifications = process3_employee_union_relationship.getPropagatedClassifications();
Set<AtlasClassification> blockedClassifications = process3_employee_union_relationship.getBlockedPropagatedClassifications();
assertNotNull(propagatedClassifications);
assertClassificationEquals(propagatedClassifications, PII_tag1);
assertClassificationEquals(propagatedClassifications, PII_tag2);
assertClassificationEquals(propagatedClassifications, PII_tag3);
assertTrue(blockedClassifications.isEmpty());
// block PII tag propagating from employees1 and employees2
PII_tag2.setEntityGuid(employees1.getGuid());
PII_tag3.setEntityGuid(employees2.getGuid());
process3_employee_union_relationship.setBlockedPropagatedClassifications(new HashSet<>(Arrays.asList(PII_tag2, PII_tag3)));
relationshipStore.update(process3_employee_union_relationship);
process3_employee_union_relationship = getRelationship(EMPLOYEES_UNION_PROCESS, EMPLOYEES_UNION_TABLE);
propagatedClassifications = process3_employee_union_relationship.getPropagatedClassifications();
blockedClassifications = process3_employee_union_relationship.getBlockedPropagatedClassifications();
assertClassificationEquals(propagatedClassifications, PII_tag1);
assertTrue(!blockedClassifications.isEmpty());
assertClassificationEquals(blockedClassifications, PII_tag2);
assertClassificationEquals(blockedClassifications, PII_tag3);
assertClassificationNotExistInEntity(EMPLOYEES_UNION_TABLE, PII_tag2);
assertClassificationNotExistInEntity(EMPLOYEES_UNION_TABLE, PII_tag3);
// assert only PII from hdfs_path is propagated to employees_union, PII from employees1 and employees2 is blocked.
assertEquals(getEntity(EMPLOYEES_UNION_TABLE).getClassifications().size(), 1);
assertClassificationExistInEntity(EMPLOYEES_UNION_TABLE, PII_tag1.getTypeName(), hdfs_path.getGuid());
}
private void assertClassificationEquals(Set<AtlasClassification> propagatedClassifications, AtlasClassification expected) {
String expectedTypeName = expected.getTypeName();
for (AtlasClassification c : propagatedClassifications) {
if(c.getTypeName().equals(expectedTypeName)) {
assertTrue(c.isPropagate() == expected.isPropagate(), "isPropgate does not match");
assertTrue(c.getValidityPeriods() == expected.getValidityPeriods(), "validityPeriods do not match");
return;
}
}
fail(expectedTypeName + " could not be found");
}
@Test(dependsOnMethods = {"addBlockedPropagatedClassifications"})
public void removeBlockedPropagatedClassifications () throws AtlasBaseException {
AtlasEntity hdfs_path = getEntity(HDFS_PATH_EMPLOYEES);
AtlasEntity employees1 = getEntity(EMPLOYEES1_TABLE);
AtlasEntity employees2 = getEntity(EMPLOYEES2_TABLE);
AtlasClassification PII_tag1 = new AtlasClassification("PII"); PII_tag1.setPropagate(true); PII_tag1.setEntityGuid(hdfs_path.getGuid());
PII_tag1.setAttribute("type", "from hdfs_path entity");
PII_tag1.setAttribute("valid", true);
AtlasClassification PII_tag2 = new AtlasClassification("PII"); PII_tag2.setPropagate(true); PII_tag2.setEntityGuid(employees1.getGuid());
PII_tag2.setAttribute("type", "from employees1 entity");
PII_tag2.setAttribute("valid", true);
AtlasClassification PII_tag3 = new AtlasClassification("PII"); PII_tag3.setPropagate(true); PII_tag3.setEntityGuid(employees2.getGuid());
PII_tag3.setAttribute("type", "from employees2 entity");
PII_tag3.setAttribute("valid", true);
AtlasRelationship process3_employee_union_relationship = getRelationship(EMPLOYEES_UNION_PROCESS, EMPLOYEES_UNION_TABLE);
// remove blocked propagated classification entry for PII (from employees2) - allow PII from employees2 to propagate to employee_union
process3_employee_union_relationship.setBlockedPropagatedClassifications(new HashSet<>(Arrays.asList(PII_tag3)));
relationshipStore.update(process3_employee_union_relationship);
process3_employee_union_relationship = getRelationship(EMPLOYEES_UNION_PROCESS, EMPLOYEES_UNION_TABLE);
Set<AtlasClassification> propagatedClassifications = process3_employee_union_relationship.getPropagatedClassifications();
Set<AtlasClassification> blockedClassifications = process3_employee_union_relationship.getBlockedPropagatedClassifications();
assertClassificationExistInList(propagatedClassifications, PII_tag1);
assertClassificationExistInList(propagatedClassifications, PII_tag2);
assertClassificationExistInList(blockedClassifications, PII_tag3);
// remove all blocked propagated classification entry
process3_employee_union_relationship.setBlockedPropagatedClassifications(Collections.emptySet());
relationshipStore.update(process3_employee_union_relationship);
process3_employee_union_relationship = getRelationship(EMPLOYEES_UNION_PROCESS, EMPLOYEES_UNION_TABLE);
propagatedClassifications = process3_employee_union_relationship.getPropagatedClassifications();
blockedClassifications = process3_employee_union_relationship.getBlockedPropagatedClassifications();
assertClassificationExistInList(propagatedClassifications, PII_tag1);
assertClassificationExistInList(propagatedClassifications, PII_tag2);
assertClassificationExistInList(propagatedClassifications, PII_tag3);
assertTrue(blockedClassifications.isEmpty());
}
private void assertClassificationExistInList(Set<AtlasClassification> classifications, AtlasClassification classification) {
String classificationName = classification.getTypeName();
String entityGuid = classification.getEntityGuid();
boolean foundClassification = false;
for (AtlasClassification c : classifications) {
if (c.getTypeName().equals(classificationName) && c.getEntityGuid().equals(entityGuid)) {
foundClassification = true;
}
}
if (!foundClassification) {
fail("Propagated classification is not present in classifications list!");
}
}
private void assertClassificationExistInEntities(List<String> entityNames, AtlasClassification classification) throws AtlasBaseException {
for (String entityName : entityNames) {
assertClassificationExistInEntity(entityName, classification);
}
}
private void assertClassificationExistInEntity(String entityName, AtlasClassification classification) throws AtlasBaseException {
assertClassificationExistInEntity(entityName, classification.getTypeName(), classification.getEntityGuid());
}
private void assertClassificationExistInEntity(String entityName, String tagName, String sourceEntityGuid) throws AtlasBaseException {
List<AtlasClassification> classifications = getEntity(entityName).getClassifications();
String classificationName = tagName;
String entityGuid = sourceEntityGuid;
if (CollectionUtils.isNotEmpty(classifications)) {
boolean foundClassification = false;
for (AtlasClassification c : classifications) {
if (c.getTypeName().equals(classificationName) && c.getEntityGuid().equals(entityGuid)) {
foundClassification = true;
}
}
if (!foundClassification) {
fail("Propagated classification is not present in entity!");
}
}
}
private void assertClassificationNotExistInEntities(List<String> entityNames, AtlasClassification classification) throws AtlasBaseException {
for (String entityName : entityNames) {
assertClassificationNotExistInEntity(entityName, classification);
}
}
private void assertClassificationNotExistInEntity(String entityName, AtlasClassification classification) throws AtlasBaseException {
List<AtlasClassification> classifications = getEntity(entityName).getClassifications();
String classificationName = classification.getTypeName();
String entityGuid = classification.getEntityGuid();
if (CollectionUtils.isNotEmpty(classifications)) {
for (AtlasClassification c : classifications) {
if (c.getTypeName().equals(classificationName) && c.getEntityGuid().equals(entityGuid)) {
fail("Propagated classification should not be present in entity!");
}
}
}
}
private void loadModelFilesAndImportTestData() {
try {
loadModelFromJson("0000-Area0/0010-base_model.json", typeDefStore, typeRegistry);
loadModelFromJson("1000-Hadoop/1020-fs_model.json", typeDefStore, typeRegistry);
loadModelFromJson("1000-Hadoop/1030-hive_model.json", typeDefStore, typeRegistry);
loadModelFromJson("1000-Hadoop/patches/001-hive_column_add_position.json", typeDefStore, typeRegistry);
loadModelFromJson("1000-Hadoop/patches/002-hive_column_table_add_options.json", typeDefStore, typeRegistry);
loadModelFromJson("1000-Hadoop/patches/003-hive_column_update_table_remove_constraint.json", typeDefStore, typeRegistry);
loadSampleClassificationDefs();
runImportWithNoParameters(importService, getZipSource("tag-propagation-data.zip"));
initializeEntitiesMap();
} catch (AtlasBaseException | IOException e) {
throw new SkipException("Model loading failed!");
}
}
public static ZipSource getZipSource(String fileName) throws IOException {
FileInputStream fs = ZipFileResourceTestUtils.getFileInputStream(fileName);
return new ZipSource(fs);
}
private void loadSampleClassificationDefs() throws AtlasBaseException {
AtlasClassificationDef tag1 = new AtlasClassificationDef("tag1");
AtlasClassificationDef tag2 = new AtlasClassificationDef("tag2");
AtlasClassificationDef tag3 = new AtlasClassificationDef("tag3");
AtlasClassificationDef tag4 = new AtlasClassificationDef("tag4");
AtlasClassificationDef PII = new AtlasClassificationDef("PII");
PII.addAttribute(new AtlasAttributeDef("type", "string"));
PII.addAttribute(new AtlasAttributeDef("valid", "boolean"));
typeDefStore.createTypesDef(new AtlasTypesDef(Collections.emptyList(), Collections.emptyList(),
Arrays.asList(tag1, tag2, tag3, tag4, PII),
Collections.emptyList(), Collections.emptyList()));
}
private void initializeEntitiesMap() throws AtlasBaseException {
entitiesMap = new HashMap<>();
entitiesMap.put(HDFS_PATH_EMPLOYEES, "a3955120-ac17-426f-a4af-972ec8690e5f");
entitiesMap.put(EMPLOYEES1_TABLE, "cdf0040e-739e-4590-a137-964d10e73573");
entitiesMap.put(EMPLOYEES2_TABLE, "0a3e66b6-472c-48b3-8453-abdd24f9494f");
entitiesMap.put(EMPLOYEES_UNION_TABLE, "1ceac963-1a2b-476a-a269-10396187d406");
entitiesMap.put(EMPLOYEES1_PROCESS, "26dae763-85b7-40af-8516-71056d91d2de");
entitiesMap.put(EMPLOYEES2_PROCESS, "c0201260-dbeb-45f4-930d-5129eab31dc9");
entitiesMap.put(EMPLOYEES_UNION_PROCESS, "470a2d1e-b1fd-47de-8f2d-8dfd0a0275a7");
lineageInfo = lineageService.getAtlasLineageInfo(entitiesMap.get(HDFS_PATH_EMPLOYEES), LineageDirection.BOTH, 3);
}
private AtlasEntity getEntity(String entityName) throws AtlasBaseException {
String entityGuid = entitiesMap.get(entityName);
AtlasEntityWithExtInfo entityWithExtInfo = entityStore.getById(entityGuid);
return entityWithExtInfo.getEntity();
}
private AtlasClassification getClassification(AtlasEntity hdfs_employees, AtlasClassification tag2) throws AtlasBaseException {
return entityStore.getClassification(hdfs_employees.getGuid(), tag2.getTypeName());
}
private void addClassification(AtlasEntity entity, AtlasClassification classification) throws AtlasBaseException {
addClassifications(entity, Collections.singletonList(classification));
}
private void addClassifications(AtlasEntity entity, List<AtlasClassification> classifications) throws AtlasBaseException {
entityStore.addClassifications(entity.getGuid(), classifications);
}
private void updateClassifications(AtlasEntity entity, AtlasClassification classification) throws AtlasBaseException {
updateClassifications(entity, Collections.singletonList(classification));
}
private void updateClassifications(AtlasEntity entity, List<AtlasClassification> classifications) throws AtlasBaseException {
entityStore.updateClassifications(entity.getGuid(), classifications);
}
private void deleteClassification(AtlasEntity entity, AtlasClassification classification) throws AtlasBaseException {
deleteClassifications(entity, Collections.singletonList(classification.getTypeName()));
}
private void deleteClassifications(AtlasEntity entity, List<String> classificationNames) throws AtlasBaseException {
entityStore.deleteClassifications(entity.getGuid(), classificationNames);
}
private AtlasRelationship getRelationship(String fromEntityName, String toEntityName) throws AtlasBaseException {
String fromEntityId = entitiesMap.get(fromEntityName);
String toEntityId = entitiesMap.get(toEntityName);
Set<LineageRelation> relations = lineageInfo.getRelations();
String relationshipGuid = null;
for (AtlasLineageInfo.LineageRelation relation : relations) {
if (relation.getFromEntityId().equals(fromEntityId) && relation.getToEntityId().equals(toEntityId)) {
relationshipGuid = relation.getRelationshipId();
}
}
return relationshipStore.getById(relationshipGuid);
}
}