blob: 34abeab6a2b82e904f79a22d9e3ea23f87e0c5b1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.web.resources;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.api.client.config.DefaultClientConfig;
import kafka.consumer.ConsumerTimeoutException;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.notification.NotificationConsumer;
import org.apache.atlas.notification.entity.EntityNotification;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.Struct;
import org.apache.atlas.typesystem.TypesDef;
import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.apache.atlas.typesystem.json.TypesSerialization;
import org.apache.atlas.typesystem.persistence.Id;
import org.apache.atlas.typesystem.types.AttributeDefinition;
import org.apache.atlas.typesystem.types.ClassType;
import org.apache.atlas.typesystem.types.DataTypes;
import org.apache.atlas.typesystem.types.EnumTypeDefinition;
import org.apache.atlas.typesystem.types.EnumValue;
import org.apache.atlas.typesystem.types.HierarchicalTypeDefinition;
import org.apache.atlas.typesystem.types.IDataType;
import org.apache.atlas.typesystem.types.Multiplicity;
import org.apache.atlas.typesystem.types.StructTypeDefinition;
import org.apache.atlas.typesystem.types.TraitType;
import org.apache.atlas.typesystem.types.TypeUtils;
import org.apache.atlas.typesystem.types.utils.TypesUtil;
import org.apache.atlas.utils.ParamChecker;
import org.apache.atlas.web.util.Servlets;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.RandomStringUtils;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;
import java.util.List;
/**
* Base class for integration tests.
* Sets up the web resource and has helper methods to create type and entity.
*/
public abstract class BaseResourceIT {
public static final String ATLAS_REST_ADDRESS = "atlas.rest.address";
protected WebResource service;
protected AtlasClient serviceClient;
public static final Logger LOG = LoggerFactory.getLogger(BaseResourceIT.class);
protected static final int MAX_WAIT_TIME = 1000;
@BeforeClass
public void setUp() throws Exception {
DefaultClientConfig config = new DefaultClientConfig();
Client client = Client.create(config);
Configuration configuration = ApplicationProperties.get();
String baseUrl = configuration.getString(ATLAS_REST_ADDRESS, "http://localhost:21000/");
client.resource(UriBuilder.fromUri(baseUrl).build());
service = client.resource(UriBuilder.fromUri(baseUrl).build());
serviceClient = new AtlasClient(baseUrl);
}
protected void createType(TypesDef typesDef) throws Exception {
HierarchicalTypeDefinition<ClassType> sampleType = typesDef.classTypesAsJavaList().get(0);
try {
serviceClient.getType(sampleType.typeName);
LOG.info("Types already exist. Skipping type creation");
} catch(AtlasServiceException ase) {
//Expected if type doesnt exist
String typesAsJSON = TypesSerialization.toJson(typesDef);
createType(typesAsJSON);
}
}
protected void createType(String typesAsJSON) throws Exception {
WebResource resource = service.path("api/atlas/types");
ClientResponse clientResponse = resource.accept(Servlets.JSON_MEDIA_TYPE).type(Servlets.JSON_MEDIA_TYPE)
.method(HttpMethod.POST, ClientResponse.class, typesAsJSON);
Assert.assertEquals(clientResponse.getStatus(), Response.Status.CREATED.getStatusCode());
String responseAsString = clientResponse.getEntity(String.class);
Assert.assertNotNull(responseAsString);
JSONObject response = new JSONObject(responseAsString);
Assert.assertNotNull(response.get("types"));
Assert.assertNotNull(response.get(AtlasClient.REQUEST_ID));
}
protected Id createInstance(Referenceable referenceable) throws Exception {
String typeName = referenceable.getTypeName();
System.out.println("creating instance of type " + typeName);
String entityJSON = InstanceSerialization.toJson(referenceable, true);
System.out.println("Submitting new entity= " + entityJSON);
JSONArray guids = serviceClient.createEntity(entityJSON);
System.out.println("created instance for type " + typeName + ", guid: " + guids);
// return the reference to created instance with guid
if (guids.length() > 0) {
return new Id(guids.getString(guids.length() - 1), 0, referenceable.getTypeName());
}
return null;
}
protected static final String DATABASE_TYPE = "hive_db";
protected static final String HIVE_TABLE_TYPE = "hive_table";
protected static final String COLUMN_TYPE = "hive_column";
protected static final String HIVE_PROCESS_TYPE = "hive_process";
protected void createTypeDefinitions() throws Exception {
HierarchicalTypeDefinition<ClassType> dbClsDef = TypesUtil
.createClassTypeDef(DATABASE_TYPE, null,
TypesUtil.createUniqueRequiredAttrDef("name", DataTypes.STRING_TYPE),
TypesUtil.createRequiredAttrDef("description", DataTypes.STRING_TYPE),
attrDef("locationUri", DataTypes.STRING_TYPE),
attrDef("owner", DataTypes.STRING_TYPE), attrDef("createTime", DataTypes.INT_TYPE));
HierarchicalTypeDefinition<ClassType> columnClsDef = TypesUtil
.createClassTypeDef(COLUMN_TYPE, null, attrDef("name", DataTypes.STRING_TYPE),
attrDef("dataType", DataTypes.STRING_TYPE), attrDef("comment", DataTypes.STRING_TYPE));
StructTypeDefinition structTypeDefinition = new StructTypeDefinition("serdeType",
new AttributeDefinition[]{TypesUtil.createRequiredAttrDef("name", DataTypes.STRING_TYPE),
TypesUtil.createRequiredAttrDef("serde", DataTypes.STRING_TYPE)});
EnumValue values[] = {new EnumValue("MANAGED", 1), new EnumValue("EXTERNAL", 2),};
EnumTypeDefinition enumTypeDefinition = new EnumTypeDefinition("tableType", values);
HierarchicalTypeDefinition<ClassType> tblClsDef = TypesUtil
.createClassTypeDef(HIVE_TABLE_TYPE, ImmutableList.of("DataSet"),
attrDef("owner", DataTypes.STRING_TYPE), attrDef("createTime", DataTypes.LONG_TYPE),
attrDef("lastAccessTime", DataTypes.DATE_TYPE),
attrDef("temporary", DataTypes.BOOLEAN_TYPE),
new AttributeDefinition("db", DATABASE_TYPE, Multiplicity.REQUIRED, true, null),
new AttributeDefinition("columns", DataTypes.arrayTypeName(COLUMN_TYPE),
Multiplicity.OPTIONAL, true, null),
new AttributeDefinition("tableType", "tableType", Multiplicity.OPTIONAL, false, null),
new AttributeDefinition("serde1", "serdeType", Multiplicity.OPTIONAL, false, null),
new AttributeDefinition("serde2", "serdeType", Multiplicity.OPTIONAL, false, null));
HierarchicalTypeDefinition<ClassType> loadProcessClsDef = TypesUtil
.createClassTypeDef(HIVE_PROCESS_TYPE, ImmutableList.of("Process"),
attrDef("userName", DataTypes.STRING_TYPE), attrDef("startTime", DataTypes.INT_TYPE),
attrDef("endTime", DataTypes.LONG_TYPE),
attrDef("queryText", DataTypes.STRING_TYPE, Multiplicity.REQUIRED),
attrDef("queryPlan", DataTypes.STRING_TYPE, Multiplicity.REQUIRED),
attrDef("queryId", DataTypes.STRING_TYPE, Multiplicity.REQUIRED),
attrDef("queryGraph", DataTypes.STRING_TYPE, Multiplicity.REQUIRED));
HierarchicalTypeDefinition<TraitType> classificationTrait = TypesUtil
.createTraitTypeDef("classification", ImmutableList.<String>of(),
TypesUtil.createRequiredAttrDef("tag", DataTypes.STRING_TYPE));
HierarchicalTypeDefinition<TraitType> piiTrait =
TypesUtil.createTraitTypeDef("pii", ImmutableList.<String>of());
HierarchicalTypeDefinition<TraitType> phiTrait =
TypesUtil.createTraitTypeDef("phi", ImmutableList.<String>of());
HierarchicalTypeDefinition<TraitType> pciTrait =
TypesUtil.createTraitTypeDef("pci", ImmutableList.<String>of());
HierarchicalTypeDefinition<TraitType> soxTrait =
TypesUtil.createTraitTypeDef("sox", ImmutableList.<String>of());
HierarchicalTypeDefinition<TraitType> secTrait =
TypesUtil.createTraitTypeDef("sec", ImmutableList.<String>of());
HierarchicalTypeDefinition<TraitType> financeTrait =
TypesUtil.createTraitTypeDef("finance", ImmutableList.<String>of());
HierarchicalTypeDefinition<TraitType> dimTraitDef = TypesUtil.createTraitTypeDef("Dimension", null);
HierarchicalTypeDefinition<TraitType> factTraitDef = TypesUtil.createTraitTypeDef("Fact", null);
HierarchicalTypeDefinition<TraitType> metricTraitDef = TypesUtil.createTraitTypeDef("Metric", null);
HierarchicalTypeDefinition<TraitType> etlTraitDef = TypesUtil.createTraitTypeDef("ETL", null);
TypesDef typesDef = TypesUtil.getTypesDef(ImmutableList.of(enumTypeDefinition),
ImmutableList.of(structTypeDefinition),
ImmutableList.of(classificationTrait, piiTrait, phiTrait, pciTrait, soxTrait, secTrait, financeTrait,
dimTraitDef, factTraitDef, metricTraitDef, etlTraitDef),
ImmutableList.of(dbClsDef, columnClsDef, tblClsDef, loadProcessClsDef));
createType(typesDef);
}
AttributeDefinition attrDef(String name, IDataType dT) {
return attrDef(name, dT, Multiplicity.OPTIONAL, false, null);
}
AttributeDefinition attrDef(String name, IDataType dT, Multiplicity m) {
return attrDef(name, dT, m, false, null);
}
AttributeDefinition attrDef(String name, IDataType dT, Multiplicity m, boolean isComposite,
String reverseAttributeName) {
Preconditions.checkNotNull(name);
Preconditions.checkNotNull(dT);
return new AttributeDefinition(name, dT.getName(), m, isComposite, reverseAttributeName);
}
protected String randomString() {
return RandomStringUtils.randomAlphanumeric(10);
}
protected Referenceable createHiveTableInstance(String dbName, String tableName) throws Exception {
Referenceable databaseInstance = new Referenceable(DATABASE_TYPE);
databaseInstance.set("name", dbName);
databaseInstance.set("description", "foo database");
Referenceable tableInstance =
new Referenceable(HIVE_TABLE_TYPE, "classification", "pii", "phi", "pci", "sox", "sec", "finance");
tableInstance.set("name", tableName);
tableInstance.set("db", databaseInstance);
tableInstance.set("description", "bar table");
tableInstance.set("lastAccessTime", "2014-07-11T08:00:00.000Z");
tableInstance.set("type", "managed");
tableInstance.set("level", 2);
tableInstance.set("tableType", 1); // enum
tableInstance.set("compressed", false);
Struct traitInstance = (Struct) tableInstance.getTrait("classification");
traitInstance.set("tag", "foundation_etl");
Struct serde1Instance = new Struct("serdeType");
serde1Instance.set("name", "serde1");
serde1Instance.set("serde", "serde1");
tableInstance.set("serde1", serde1Instance);
Struct serde2Instance = new Struct("serdeType");
serde2Instance.set("name", "serde2");
serde2Instance.set("serde", "serde2");
tableInstance.set("serde2", serde2Instance);
List<String> traits = tableInstance.getTraits();
Assert.assertEquals(traits.size(), 7);
return tableInstance;
}
public interface Predicate {
/**
* Perform a predicate evaluation.
*
* @return the boolean result of the evaluation.
* @throws Exception thrown if the predicate evaluation could not evaluate.
*/
boolean evaluate() throws Exception;
}
public interface NotificationPredicate {
/**
* Perform a predicate evaluation.
*
* @return the boolean result of the evaluation.
* @throws Exception thrown if the predicate evaluation could not evaluate.
*/
boolean evaluate(EntityNotification notification) throws Exception;
}
/**
* Wait for a condition, expressed via a {@link Predicate} to become true.
*
* @param timeout maximum time in milliseconds to wait for the predicate to become true.
* @param predicate predicate waiting on.
*/
protected void waitFor(int timeout, Predicate predicate) throws Exception {
ParamChecker.notNull(predicate, "predicate");
long mustEnd = System.currentTimeMillis() + timeout;
boolean eval;
while (!(eval = predicate.evaluate()) && System.currentTimeMillis() < mustEnd) {
LOG.info("Waiting up to {} msec", mustEnd - System.currentTimeMillis());
Thread.sleep(100);
}
if (!eval) {
throw new Exception("Waiting timed out after " + timeout + " msec");
}
}
protected EntityNotification waitForNotification(final NotificationConsumer<EntityNotification> consumer, int maxWait,
final NotificationPredicate predicate) throws Exception {
final TypeUtils.Pair<EntityNotification, String> pair = TypeUtils.Pair.of(null, null);
final long maxCurrentTime = System.currentTimeMillis() + maxWait;
waitFor(maxWait, new Predicate() {
@Override
public boolean evaluate() throws Exception {
try {
while (consumer.hasNext() && System.currentTimeMillis() < maxCurrentTime) {
EntityNotification notification = consumer.next();
if (predicate.evaluate(notification)) {
pair.left = notification;
return true;
}
}
} catch(ConsumerTimeoutException e) {
//ignore
}
return false;
}
});
return pair.left;
}
protected NotificationPredicate newNotificationPredicate(final EntityNotification.OperationType operationType,
final String typeName, final String guid) {
return new NotificationPredicate() {
@Override
public boolean evaluate(EntityNotification notification) throws Exception {
return notification != null &&
notification.getOperationType() == operationType &&
notification.getEntity().getTypeName().equals(typeName) &&
notification.getEntity().getId()._getId().equals(guid);
}
};
}
}