blob: d3f4b57adf171400ca32060b92a670242ee3d495 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.entitytransform;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.impexp.AttributeTransform;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.typedef.AtlasEntityDef;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.Test;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.atlas.entitytransform.TransformationConstants.HDFS_PATH;
import static org.apache.atlas.entitytransform.TransformationConstants.HIVE_COLUMN;
import static org.apache.atlas.entitytransform.TransformationConstants.HIVE_DATABASE;
import static org.apache.atlas.entitytransform.TransformationConstants.HIVE_STORAGE_DESCRIPTOR;
import static org.apache.atlas.entitytransform.TransformationConstants.QUALIFIED_NAME_ATTRIBUTE;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.apache.atlas.entitytransform.TransformationConstants.HIVE_TABLE;
public class TransformationHandlerTest {
private static final Logger LOG = LoggerFactory.getLogger(TransformationHandlerTest.class);
private static final String TYPENAME_REFERENCEABLE = "Referenceable";
private static final String TYPENAME_ASSET = "Asset";
private static final String TYPENAME_NON_ASSET = "non_asset";
private static final String[] CLUSTER_NAMES = new String[] { "cl1", "prod" };
private static final String[] DATABASE_NAMES = new String[] { "hr", "sales", "engg" };
private static final String[] TABLE_NAMES = new String[] { "employees", "products", "invoice" };
private static final String[] COLUMN_NAMES = new String[] { "name", "age", "dob" };
private final String ATTR_NAME_QUALIFIED_NAME = "qualifiedName";
@Test
public void testHdfsClusterRenameHandler() {
// Rename clusterName from cl1 to cl2
AttributeTransform p1 = new AttributeTransform(Collections.singletonMap("hdfs_path.clusterName", "EQUALS: cl1"),
Collections.singletonMap("hdfs_path.clusterName", "SET: cl2"));
List<BaseEntityHandler> handlers = initializeHandlers(Collections.singletonList(p1));
for (AtlasEntity hdfsPath : getHdfsPathEntities()) {
String qualifiedName = (String) hdfsPath.getAttribute(ATTR_NAME_QUALIFIED_NAME);
boolean endsWithCl1 = qualifiedName.endsWith("@cl1");
applyTransforms(hdfsPath, handlers);
String transformedValue = (String) hdfsPath.getAttribute(ATTR_NAME_QUALIFIED_NAME);
if (endsWithCl1) {
assertTrue(transformedValue.endsWith("@cl2"), transformedValue + ": expected to end with @cl2");
} else {
assertEquals(qualifiedName, transformedValue, "not expected to change");
}
}
}
@Test
public void testHdfsClusterNameToggleCaseHandler() {
// Change clusterName to Upper case
AttributeTransform p1 = new AttributeTransform(Collections.singletonMap("hdfs_path.clusterName", "EQUALS: cl1"),
Collections.singletonMap("hdfs_path.clusterName", "TO_UPPER:"));
List<BaseEntityHandler> handlers = initializeHandlers(Collections.singletonList(p1));
List<AtlasEntity> hdfsPaths = getHdfsPathEntities();
for (AtlasEntity hdfsPath : hdfsPaths) {
String qualifiedName = (String) hdfsPath.getAttribute(ATTR_NAME_QUALIFIED_NAME);
boolean endsWithCl1 = qualifiedName.endsWith("@cl1");
applyTransforms(hdfsPath, handlers);
String transformedValue = (String) hdfsPath.getAttribute(ATTR_NAME_QUALIFIED_NAME);
if (endsWithCl1) {
assertTrue(transformedValue.endsWith("@CL1"), transformedValue + ": expected to end with @CL1");
} else {
assertEquals(qualifiedName, transformedValue, "not expected to change");
}
}
// Change clusterName back to lower case
AttributeTransform p2 = new AttributeTransform(Collections.singletonMap("hdfs_path.clusterName", "EQUALS: CL1"),
Collections.singletonMap("hdfs_path.clusterName", "TO_LOWER:"));
handlers = initializeHandlers(Collections.singletonList(p2));
for (AtlasEntity hdfsPath : hdfsPaths) {
String qualifiedName = (String) hdfsPath.getAttribute(ATTR_NAME_QUALIFIED_NAME);
boolean endsWithCL1 = qualifiedName.endsWith("@CL1");
applyTransforms(hdfsPath, handlers);
String transformedValue = (String) hdfsPath.getAttribute(ATTR_NAME_QUALIFIED_NAME);
if (endsWithCL1) {
assertTrue(transformedValue.endsWith("@cl1"), transformedValue + ": expected to end with @cl1");
} else {
assertEquals(qualifiedName, transformedValue, "not expected to change");
}
}
}
@Test
public void testHiveTableClearAttributeHandler() {
// clear replicatedTo attribute for hive_table entities
AttributeTransform p1 = new AttributeTransform(Collections.singletonMap("hive_table.replicatedTo", "HAS_VALUE:"),
Collections.singletonMap("hive_table.replicatedTo", "CLEAR:"));
List<BaseEntityHandler> handlers = initializeHandlers(Collections.singletonList(p1));
List<AtlasEntity> entities = getAllEntities();
for (AtlasEntity entity : entities) {
String replicatedTo = (String) entity.getAttribute("replicatedTo");
if (entity.getTypeName() == HIVE_TABLE) {
assertTrue(StringUtils.isNotEmpty(replicatedTo));
}
applyTransforms(entity, handlers);
String transformedValue = (String) entity.getAttribute("replicatedTo");
if (entity.getTypeName() == HIVE_TABLE) {
assertTrue(StringUtils.isEmpty(transformedValue));
}
}
}
@Test
public void testEntityClearAttributesActionWithNoCondition() {
// clear replicatedFrom attribute for hive_table entities without any condition
Map<String, String> actions = new HashMap<String, String>() {{ put("Referenceable.replicatedTo", "CLEAR:");
put("Referenceable.replicatedFrom", "CLEAR:"); }};
AttributeTransform transform = new AttributeTransform(null, actions);
List<BaseEntityHandler> handlers = initializeHandlers(Collections.singletonList(transform));
List<AtlasEntity> entities = getAllEntities();
for (AtlasEntity entity : entities) {
String replicatedTo = (String) entity.getAttribute("replicatedTo");
String replicatedFrom = (String) entity.getAttribute("replicatedFrom");
if (entity.getTypeName() == HIVE_TABLE) {
assertTrue(StringUtils.isNotEmpty(replicatedTo));
assertTrue(StringUtils.isNotEmpty(replicatedFrom));
}
applyTransforms(entity, handlers);
replicatedTo = (String) entity.getAttribute("replicatedTo");
replicatedFrom = (String) entity.getAttribute("replicatedFrom");
if (entity.getTypeName() == HIVE_TABLE) {
assertTrue(StringUtils.isEmpty(replicatedTo));
assertTrue(StringUtils.isEmpty(replicatedFrom));
}
}
}
@Test
public void testEntityClearAttributesActionWithNoTypeNameAndNoCondition() {
// clear replicatedFrom attribute for hive_table entities without any condition
Map<String, String> actions = new HashMap<String, String>() {{ put("replicatedTo", "CLEAR:");
put("replicatedFrom", "CLEAR:"); }};
AttributeTransform transform = new AttributeTransform(null, actions);
List<BaseEntityHandler> handlers = initializeHandlers(Collections.singletonList(transform));
List<AtlasEntity> entities = getAllEntities();
for (AtlasEntity entity : entities) {
String replicatedTo = (String) entity.getAttribute("replicatedTo");
String replicatedFrom = (String) entity.getAttribute("replicatedFrom");
if (entity.getTypeName() == HIVE_TABLE) {
assertTrue(StringUtils.isNotEmpty(replicatedTo));
assertTrue(StringUtils.isNotEmpty(replicatedFrom));
}
applyTransforms(entity, handlers);
replicatedTo = (String) entity.getAttribute("replicatedTo");
replicatedFrom = (String) entity.getAttribute("replicatedFrom");
if (entity.getTypeName() == HIVE_TABLE) {
assertTrue(StringUtils.isEmpty(replicatedTo));
assertTrue(StringUtils.isEmpty(replicatedFrom));
}
}
}
@Test
public void testHdfsPathNameReplacePrefixHandler() {
// Prefix replace hdfs_path name from /aa/bb/ to /xx/yy/
AttributeTransform p1 = new AttributeTransform(Collections.singletonMap("hdfs_path.name", "STARTS_WITH: /aa/bb/"),
Collections.singletonMap("hdfs_path.name", "REPLACE_PREFIX: = :/aa/bb/=/xx/yy/"));
List<BaseEntityHandler> handlers = initializeHandlers(Collections.singletonList(p1));
for (AtlasEntity hdfsPath : getHdfsPathEntities()) {
String name = (String) hdfsPath.getAttribute("name");
boolean startsWith_aa_bb_ = name.startsWith("/aa/bb/");
applyTransforms(hdfsPath, handlers);
String transformedValue = (String) hdfsPath.getAttribute("name");
if (startsWith_aa_bb_) {
assertTrue(transformedValue.startsWith("/xx/yy/"), transformedValue + ": expected to start with /xx/yy/");
} else {
assertEquals(name, transformedValue, "not expected to change");
}
}
}
@Test
public void testHiveDatabaseClusterRenameHandler() {
// replace clusterName: from cl1 to cl1_backup
AttributeTransform p1 = new AttributeTransform(Collections.singletonMap("hive_db.clusterName", "EQUALS: cl1"),
Collections.singletonMap("hive_db.clusterName", "SET: cl1_backup"));
List<BaseEntityHandler> handlers = initializeHandlers(Collections.singletonList(p1));
for (AtlasEntity entity : getAllEntities()) {
String qualifiedName = (String) entity.getAttribute(ATTR_NAME_QUALIFIED_NAME);
boolean isHdfsPath = StringUtils.equals(entity.getTypeName(), HDFS_PATH);
boolean endsWithCl1 = qualifiedName.endsWith("@cl1");
boolean containsCl1 = qualifiedName.contains("@cl1"); // for stroage_desc
applyTransforms(entity, handlers);
String transformedValue = (String) entity.getAttribute(ATTR_NAME_QUALIFIED_NAME);
if (!isHdfsPath && endsWithCl1) {
assertTrue(transformedValue.endsWith("@cl1_backup"), transformedValue + ": expected to end with @cl1_backup");
} else if (!isHdfsPath && containsCl1) {
assertTrue(transformedValue.contains("@cl1_backup"), transformedValue + ": expected to contains @cl1_backup");
} else {
assertEquals(qualifiedName, transformedValue, "not expected to change");
}
}
}
@Test
public void testHiveDatabaseNameRenameHandler() {
// replace dbName: from hr to hr_backup
AttributeTransform p = new AttributeTransform(Collections.singletonMap("hive_db.name", "EQUALS: hr"),
Collections.singletonMap("hive_db.name", "SET: hr_backup"));
List<BaseEntityHandler> handlers = initializeHandlers(Collections.singletonList(p));
for (AtlasEntity entity : getAllEntities()) {
String qualifiedName = (String) entity.getAttribute(ATTR_NAME_QUALIFIED_NAME);
boolean startsWithHrDot = qualifiedName.startsWith("hr."); // for tables, columns
boolean startsWithHrAt = qualifiedName.startsWith("hr@"); // for databases
applyTransforms(entity, handlers);
if (startsWithHrDot) {
assertTrue(((String) entity.getAttribute(ATTR_NAME_QUALIFIED_NAME)).startsWith("hr_backup."));
} else if (startsWithHrAt) {
assertTrue(((String) entity.getAttribute(ATTR_NAME_QUALIFIED_NAME)).startsWith("hr_backup@"));
} else {
assertEquals(qualifiedName, (String) entity.getAttribute(ATTR_NAME_QUALIFIED_NAME), "not expected to change");
}
}
}
@Test
public void testHiveTableNameRenameHandler() {
// replace tableName: from hr.employees to hr.employees_backup
AttributeTransform p = new AttributeTransform();
p.addCondition("hive_db.name", "EQUALS: hr");
p.addCondition("hive_table.name", "EQUALS: employees");
p.addAction("hive_table.name", "SET: employees_backup");
List<BaseEntityHandler> handlers = initializeHandlers(Collections.singletonList(p));
for (AtlasEntity entity : getAllEntities()) {
String qualifiedName = (String) entity.getAttribute(ATTR_NAME_QUALIFIED_NAME);
boolean startsWithHrEmployeesDot = qualifiedName.startsWith("hr.employees."); // for columns
boolean startsWithHrEmployeesAt = qualifiedName.startsWith("hr.employees@"); // for tables
applyTransforms(entity, handlers);
if (startsWithHrEmployeesDot) {
assertTrue(((String) entity.getAttribute(ATTR_NAME_QUALIFIED_NAME)).startsWith("hr.employees_backup."));
} else if (startsWithHrEmployeesAt) {
assertTrue(((String) entity.getAttribute(ATTR_NAME_QUALIFIED_NAME)).startsWith("hr.employees_backup@"));
} else {
assertEquals(qualifiedName, (String) entity.getAttribute(ATTR_NAME_QUALIFIED_NAME), "not expected to change");
}
}
}
@Test
public void testHiveColumnNameRenameHandler() {
// replace columnName: from hr.employees.age to hr.employees.age_backup
AttributeTransform p = new AttributeTransform();
p.addCondition("hive_db.name", "EQUALS: hr");
p.addCondition("hive_table.name", "EQUALS: employees");
p.addCondition("hive_column.name", "EQUALS: age");
p.addAction("hive_column.name", "SET: age_backup");
List<BaseEntityHandler> handlers = initializeHandlers(Collections.singletonList(p));
for (AtlasEntity entity : getAllEntities()) {
String qualifiedName = (String) entity.getAttribute(ATTR_NAME_QUALIFIED_NAME);
boolean startsWithHrEmployeesAgeAt = qualifiedName.startsWith("hr.employees.age@");
applyTransforms(entity, handlers);
if (startsWithHrEmployeesAgeAt) {
assertTrue(((String) entity.getAttribute(ATTR_NAME_QUALIFIED_NAME)).startsWith("hr.employees.age_backup@"));
} else {
assertEquals(qualifiedName, (String) entity.getAttribute(ATTR_NAME_QUALIFIED_NAME), "not expected to change");
}
}
}
@Test
public void verifyAddClassification() {
AtlasEntityTransformer transformer = new AtlasEntityTransformer(Collections.singletonMap("hive_db.qualifiedName", "EQUALS: hr@cl1"),
Collections.singletonMap("Referenceable.", "ADD_CLASSIFICATION: replicated"),
getTransformerContext());
List<BaseEntityHandler> handlers = Collections.singletonList(new BaseEntityHandler(Collections.singletonList(transformer)));
assertApplyTransform(handlers);
}
@Test
public void verifyAddClassificationUsingScope() {
AtlasExportRequest exportRequest = new AtlasExportRequest();
exportRequest.setItemsToExport(Collections.singletonList(new AtlasObjectId("hive_db", Collections.singletonMap(ATTR_NAME_QUALIFIED_NAME, "hr@cl1"))));
AtlasEntityTransformer transformer = new AtlasEntityTransformer(Collections.singletonMap("Referenceable.", "topLevel: "),
Collections.singletonMap("Referenceable", "ADD_CLASSIFICATION: replicated"),
new TransformerContext(getTypeRegistry(), null, exportRequest));
List<BaseEntityHandler> handlers = Collections.singletonList(new BaseEntityHandler(Collections.singletonList(transformer)));
assertApplyTransform(handlers);
}
@Test
public void verifyEntityTypeInAttributeName() {
AttributeTransform p = new AttributeTransform();
p.addAction("Asset.name", "SET: renamed");
List<BaseEntityHandler> handlers = initializeHandlers(Collections.singletonList(p));
AtlasEntity assetEntity = new AtlasEntity(TYPENAME_ASSET, "name", "originalName");
AtlasEntity assetSubEntity = new AtlasEntity(HIVE_DATABASE, "name", "originalName");
AtlasEntity nonAssetEntity = new AtlasEntity(TYPENAME_NON_ASSET, "name", "originalName");
applyTransforms(assetEntity, handlers);
applyTransforms(assetSubEntity, handlers);
applyTransforms(nonAssetEntity, handlers);
assertEquals((String) assetEntity.getAttribute("name"), "renamed", "Asset.name expected to be updated for Asset entity");
assertEquals((String) assetSubEntity.getAttribute("name"), "renamed", "Asset.name expected to be updated for Asset sub-type entity");
assertEquals((String) nonAssetEntity.getAttribute("name"), "originalName", "Asset.name expected to be not updated for non-Asset type entity");
}
@Test
public void verifyNoEntityTypeInAttributeName() {
AttributeTransform p = new AttributeTransform();
p.addAction("name", "SET: renamed");
List<BaseEntityHandler> handlers = initializeHandlers(Collections.singletonList(p));
AtlasEntity assetEntity = new AtlasEntity(TYPENAME_ASSET, "name", "originalName");
AtlasEntity assetSubEntity = new AtlasEntity(HIVE_DATABASE, "name", "originalName");
AtlasEntity nonAssetEntity = new AtlasEntity(TYPENAME_NON_ASSET, "name", "originalName");
applyTransforms(assetEntity, handlers);
applyTransforms(assetSubEntity, handlers);
applyTransforms(nonAssetEntity, handlers);
assertEquals((String) assetEntity.getAttribute("name"), "renamed", "name expected to be updated for Asset entity");
assertEquals((String) assetSubEntity.getAttribute("name"), "renamed", "name expected to be updated for Asset sub-type entity");
assertEquals((String) nonAssetEntity.getAttribute("name"), "renamed", "name expected to be not updated for non-Asset type entity");
}
@Test
public void qualifiedNameDifferentFromName() throws IOException {
final String expectedQualifiedName = "test_partition_bootstrap_target.temptable_temp-5e93084a-f2d1-46f7-8f8f-30ffb859d2be@mycluster1";
AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = readObjectFromJson("entity1", AtlasEntity.AtlasEntityWithExtInfo.class);
List<BaseEntityHandler> handlers = BaseEntityHandler.fromJson(getStringFromFile("transform1"), null);
assertNotNull(handlers);
assertEquals(handlers.size(), 4);
assertNotNull(entityWithExtInfo);
applyTransforms(entityWithExtInfo.getEntity(), handlers);
assertEquals(entityWithExtInfo.getEntity().getAttribute(ATTR_NAME_QUALIFIED_NAME), expectedQualifiedName);
}
@Test
public void emptyQualifiedNameHandling() throws IOException {
AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = readObjectFromJson("entity1", AtlasEntity.AtlasEntityWithExtInfo.class);
entityWithExtInfo.getEntity().setAttribute(QUALIFIED_NAME_ATTRIBUTE, "");
List<BaseEntityHandler> handlers = BaseEntityHandler.fromJson("[{\"conditions\":{\"hive_db.clusterName\":\"EQUALS: mycluster0\"},\"action\":{\"hive_db.clusterName\":\"SET: mycluster1\"}}]", null);
assertNotNull(handlers);
assertEquals(handlers.size(), 4);
assertNotNull(entityWithExtInfo);
applyTransforms(entityWithExtInfo.getEntity(), handlers);
assertEquals(entityWithExtInfo.getEntity().getAttribute(ATTR_NAME_QUALIFIED_NAME), "");
}
private void assertApplyTransform(List<BaseEntityHandler> handlers) {
for (AtlasEntity entity : getAllEntities()) {
applyTransforms(entity, handlers);
if(entity.getTypeName().equals("hive_db") && entity.getAttribute(ATTR_NAME_QUALIFIED_NAME).equals("hr@cl1")) {
assertNotNull(entity.getClassifications());
} else{
assertNull(entity.getClassifications());
}
}
}
private List<BaseEntityHandler> initializeHandlers(List<AttributeTransform> params) {
return BaseEntityHandler.createEntityHandlers(params, getTransformerContext());
}
private void applyTransforms(AtlasEntity entity, List<BaseEntityHandler> handlers) {
for (BaseEntityHandler handler : handlers) {
handler.transform(entity);
}
}
private TransformerContext getTransformerContext() {
return new TransformerContext(getTypeRegistry(), null, null);
}
private AtlasTypeRegistry getTypeRegistry() {
AtlasTypeRegistry ret = new AtlasTypeRegistry();
AtlasEntityDef defReferenceable = new AtlasEntityDef(TYPENAME_REFERENCEABLE);
AtlasEntityDef defAsset = new AtlasEntityDef(TYPENAME_ASSET);
AtlasEntityDef defHdfsPath = new AtlasEntityDef(HDFS_PATH);
AtlasEntityDef defHiveDb = new AtlasEntityDef(HIVE_DATABASE);
AtlasEntityDef defHiveTable = new AtlasEntityDef(HIVE_TABLE);
AtlasEntityDef defHiveColumn = new AtlasEntityDef(HIVE_COLUMN);
AtlasEntityDef defHiveStorDesc = new AtlasEntityDef(HIVE_STORAGE_DESCRIPTOR);
AtlasEntityDef defNonAsset = new AtlasEntityDef(TYPENAME_NON_ASSET);
defAsset.addSuperType(TYPENAME_REFERENCEABLE);
defHdfsPath.addSuperType(TYPENAME_ASSET);
defHiveDb.addSuperType(TYPENAME_ASSET);
defHiveTable.addSuperType(TYPENAME_ASSET);
defHiveColumn.addSuperType(TYPENAME_ASSET);
defNonAsset.addSuperType(TYPENAME_REFERENCEABLE);
AtlasTypesDef typesDef = new AtlasTypesDef();
typesDef.setEntityDefs(Arrays.asList(defReferenceable, defAsset, defHdfsPath, defHiveDb, defHiveTable, defHiveColumn, defHiveStorDesc, defNonAsset));
try {
AtlasTypeRegistry.AtlasTransientTypeRegistry ttr = ret.lockTypeRegistryForUpdate();
ttr.addTypes(typesDef);
ret.releaseTypeRegistryForUpdate(ttr, true);
} catch (AtlasBaseException excp) {
LOG.warn("failed to initialize type-registry", excp);
}
return ret;
}
private List<AtlasEntity> getHdfsPathEntities() {
List<AtlasEntity> ret = new ArrayList<>();
for (String clusterName : CLUSTER_NAMES) {
ret.add(getHdfsPathEntity1(clusterName));
ret.add(getHdfsPathEntity2(clusterName));
}
return ret;
}
private List<AtlasEntity> getAllEntities() {
List<AtlasEntity> ret = new ArrayList<>();
for (String clusterName : CLUSTER_NAMES) {
ret.add(getHdfsPathEntity1(clusterName));
ret.add(getHdfsPathEntity2(clusterName));
for (String databaseName : DATABASE_NAMES) {
ret.add(getHiveDbEntity(clusterName, databaseName));
for (String tableName : TABLE_NAMES) {
ret.add(getHiveTableEntity(clusterName, databaseName, tableName));
ret.add(getHiveStorageDescriptorEntity(clusterName, databaseName, tableName));
for (String columnName : COLUMN_NAMES) {
ret.add(getHiveColumnEntity(clusterName, databaseName, tableName, columnName));
}
}
}
}
return ret;
}
private AtlasEntity getHdfsPathEntity1(String clusterName) {
AtlasEntity entity = new AtlasEntity(HDFS_PATH);
entity.setAttribute("name", "/aa/bb/employee");
entity.setAttribute("path", "hdfs://localhost.localdomain:8020/aa/bb/employee");
entity.setAttribute(ATTR_NAME_QUALIFIED_NAME, "hdfs://localhost.localdomain:8020/aa/bb/employee@" + clusterName);
entity.setAttribute("clusterName", clusterName);
entity.setAttribute("isSymlink", false);
entity.setAttribute("modifiedTime", 0);
entity.setAttribute("isFile", false);
entity.setAttribute("numberOfReplicas", 0);
entity.setAttribute("createTime", 0);
entity.setAttribute("fileSize", 0);
return entity;
}
private AtlasEntity getHdfsPathEntity2(String clusterName) {
AtlasEntity entity = new AtlasEntity(HDFS_PATH);
entity.setAttribute("name", "/cc/dd/employee");
entity.setAttribute("path", "hdfs://localhost.localdomain:8020/cc/dd/employee");
entity.setAttribute(ATTR_NAME_QUALIFIED_NAME, "hdfs://localhost.localdomain:8020/cc/dd/employee@" + clusterName);
entity.setAttribute("clusterName", clusterName);
entity.setAttribute("isSymlink", false);
entity.setAttribute("modifiedTime", 0);
entity.setAttribute("isFile", false);
entity.setAttribute("numberOfReplicas", 0);
entity.setAttribute("createTime", 0);
entity.setAttribute("fileSize", 0);
return entity;
}
private AtlasEntity getHiveDbEntity(String clusterName, String dbName) {
AtlasEntity entity = new AtlasEntity(TransformationConstants.HIVE_DATABASE);
entity.setAttribute("name", dbName);
entity.setAttribute(ATTR_NAME_QUALIFIED_NAME, dbName + "@" + clusterName);
entity.setAttribute("location", "hdfs://localhost.localdomain:8020/warehouse/tablespace/managed/hive/" + dbName + ".db");
entity.setAttribute("clusterName", clusterName);
entity.setAttribute("owner", "hive");
entity.setAttribute("ownerType", "USER");
return entity;
}
private AtlasEntity getHiveTableEntity(String clusterName, String dbName, String tableName) {
String qualifiedName = dbName + "." + tableName + "@" + clusterName;
AtlasEntity entity = new AtlasEntity(TransformationConstants.HIVE_TABLE);
entity.setAttribute("name", tableName);
entity.setAttribute(ATTR_NAME_QUALIFIED_NAME, qualifiedName);
entity.setAttribute("owner", "hive");
entity.setAttribute("temporary", false);
entity.setAttribute("lastAccessTime", "1535656355000");
entity.setAttribute("tableType", "EXTERNAL_TABLE");
entity.setAttribute("createTime", "1535656355000");
entity.setAttribute("retention", 0);
entity.setAttribute("replicatedTo", "[{\"guid\":\"f378cfa5-c4aa-4699-a733-8f11d2f089cd\",\"typeName\":\"AtlasServer\"},{\"guid\":\"58e42789-ea3e-4eaa-a0c4-d38d8632e548\",\"typeName\":\"AtlasServer\"}]");
entity.setAttribute("replicatedFrom", "[{\"guid\":\"f378cfa5-c4aa-4699-a733-8f11d2f089cd\",\"typeName\":\"AtlasServer\"},{\"guid\":\"58e42789-ea3e-4eaa-a0c4-d38d8632e548\",\"typeName\":\"AtlasServer\"}]");
return entity;
}
private AtlasEntity getHiveStorageDescriptorEntity(String clusterName, String dbName, String tableName) {
String qualifiedName = "hdfs://localhost.localdomain:8020/warehouse/tablespace/managed/hive/" + dbName + ".db" + "/" + tableName;
AtlasEntity entity = new AtlasEntity(TransformationConstants.HIVE_STORAGE_DESCRIPTOR);
entity.setAttribute(ATTR_NAME_QUALIFIED_NAME, dbName + "." + tableName + "@" + clusterName + "_storage");
entity.setAttribute("storedAsSubDirectories", false);
entity.setAttribute("location", qualifiedName);
entity.setAttribute("compressed", false);
entity.setAttribute("inputFormat", "org.apache.hadoop.mapred.TextInputFormat");
entity.setAttribute("outputFormat", "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat");
entity.setAttribute("numBuckets", -1);
return entity;
}
private AtlasEntity getHiveColumnEntity(String clusterName, String dbName, String tableName, String columnName) {
String qualifiedName = dbName + "." + tableName + "." + columnName + "@" + clusterName;
AtlasEntity entity = new AtlasEntity(TransformationConstants.HIVE_COLUMN);
entity.setAttribute("owner", "hive");
entity.setAttribute(ATTR_NAME_QUALIFIED_NAME, qualifiedName);
entity.setAttribute("name", columnName);
entity.setAttribute("position", 1);
entity.setAttribute("type", "string");
return entity;
}
private AtlasEntity getNonAssetEntity() {
return new AtlasEntity(TYPENAME_NON_ASSET);
}
public static <T> T readObjectFromJson(String filename, Class<T> objectClass) throws IOException {
String json = getStringFromFile(filename);
return AtlasType.fromJson(json, objectClass);
}
private static String getStringFromFile(String filename) throws IOException {
final String userDir = System.getProperty("user.dir");
return FileUtils.readFileToString(new File(getTestJsonPath(userDir, filename)));
}
private static String getTestJsonPath(String startPath, String fileName) {
return startPath + "/src/test/resources/json/" + fileName + ".json";
}
}