Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-atlas into branch-0.6-incubating
diff --git a/distro/src/conf/application.properties b/distro/src/conf/application.properties
index f9888bd..cb6ee31 100755
--- a/distro/src/conf/application.properties
+++ b/distro/src/conf/application.properties
@@ -57,6 +57,7 @@
atlas.kafka.zookeeper.session.timeout.ms=400
atlas.kafka.zookeeper.sync.time.ms=20
atlas.kafka.auto.commit.interval.ms=1000
+atlas.kafka.auto.offset.reset=smallest
atlas.kafka.hook.group.id=atlas
diff --git a/docs/src/site/twiki/Bridge-Hive.twiki b/docs/src/site/twiki/Bridge-Hive.twiki
index 0c7732b..c8e9009 100644
--- a/docs/src/site/twiki/Bridge-Hive.twiki
+++ b/docs/src/site/twiki/Bridge-Hive.twiki
@@ -19,12 +19,12 @@
hive_process(ClassType) - super types [Process] - attributes [startTime, endTime, userName, operationType, queryText, queryPlan, queryId, queryGraph]
</verbatim>
-The entities are created and de-duped using unique qualified name. They provide namespace and can be used for querying/lineage as well. Note that dbName and tableName should be in lower case. clusterName is explained below:
-hive_db - attribute qualifiedName - <dbName>@<clusterName>
-hive_table - attribute name - <dbName>.<tableName>@<clusterName>
-hive_column - attribute qualifiedName - <dbName>.<tableName>.<columnName>@<clusterName>
-hive_partition - attribute qualifiedName - <dbName>.<tableName>.<partitionValues('-' separated)>@<clusterName>
-hive_process - attribute name - <queryString> - trimmed query string in lower case
+The entities are created and de-duped using unique qualified name. They provide namespace and can be used for querying/lineage as well. Note that dbName and tableName should be in lower case. clusterName is explained below.
+ * hive_db - attribute qualifiedName - <dbName>@<clusterName>
+ * hive_table - attribute name - <dbName>.<tableName>@<clusterName>
+ * hive_column - attribute qualifiedName - <dbName>.<tableName>.<columnName>@<clusterName>
+ * hive_partition - attribute qualifiedName - <dbName>.<tableName>.<partitionValues('-' separated)>@<clusterName>
+ * hive_process - attribute name - <queryString> - trimmed query string in lower case
---++ Importing Hive Metadata
diff --git a/docs/src/site/twiki/Configuration.twiki b/docs/src/site/twiki/Configuration.twiki
index 4e13d61..8b942ab 100644
--- a/docs/src/site/twiki/Configuration.twiki
+++ b/docs/src/site/twiki/Configuration.twiki
@@ -53,8 +53,7 @@
echo "grant 'atlas', 'RWXCA', 'titan'" | hbase shell
</verbatim>
----++++ Graph Search Index
-
+---+++ Graph Search Index
This section sets up the graph db - titan - to use an search indexing system. The example
configuration below sets up to use an embedded Elastic search indexing system.
diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java
index d4e07c0..1f05df4 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java
@@ -69,4 +69,10 @@
consumerId, message.topic(), message.partition(), message.offset(), message.message());
return (String) message.message();
}
+
+ @Override
+ protected String peekMessage() {
+ MessageAndMetadata message = (MessageAndMetadata) iterator.peek();
+ return (String) message.message();
+ }
}
diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java b/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java
index 42a4e7f..b6a9d7b 100644
--- a/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java
@@ -94,10 +94,11 @@
}
@Override
- public void remove() {
- throw new UnsupportedOperationException("The remove method is not supported.");
+ public T peek() {
+ return GSON.fromJson(peekMessage(), type);
}
+ protected abstract String peekMessage();
// ----- inner class : ImmutableListDeserializer ---------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
index 346ec3e..d2da975 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
@@ -17,8 +17,11 @@
package org.apache.atlas.notification;
-import java.util.Iterator;
-
// TODO : docs!
-public interface NotificationConsumer<T> extends Iterator<T>{
+public interface NotificationConsumer<T>{
+ boolean hasNext();
+
+ T next();
+
+ T peek();
}
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 6876758..3352cd0 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -19,10 +19,10 @@
import com.google.inject.Inject;
import com.google.inject.Singleton;
+import kafka.consumer.ConsumerTimeoutException;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
-import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.service.Service;
import org.apache.commons.configuration.Configuration;
@@ -62,7 +62,7 @@
executors = Executors.newFixedThreadPool(consumers.size());
for (final NotificationConsumer<HookNotification.HookNotificationMessage> consumer : consumers) {
- executors.submit(new HookConsumer(atlasClient, consumer));
+ executors.submit(new HookConsumer(consumer));
}
}
@@ -89,12 +89,23 @@
private final NotificationConsumer<HookNotification.HookNotificationMessage> consumer;
private final AtlasClient client;
- public HookConsumer(AtlasClient atlasClient,
- NotificationConsumer<HookNotification.HookNotificationMessage> consumer) {
- this.client = atlasClient;
+ public HookConsumer(NotificationConsumer<HookNotification.HookNotificationMessage> consumer) {
+ this(atlasClient, consumer);
+ }
+
+ public HookConsumer(AtlasClient client, NotificationConsumer<HookNotification.HookNotificationMessage> consumer) {
+ this.client = client;
this.consumer = consumer;
}
+ private boolean hasNext() {
+ try {
+ return consumer.hasNext();
+ } catch(ConsumerTimeoutException e) {
+ return false;
+ }
+ }
+
@Override
public void run() {
@@ -102,33 +113,39 @@
return;
}
- while(consumer.hasNext()) {
- HookNotification.HookNotificationMessage message = consumer.next();
-
+ while(true) {
try {
- switch (message.getType()) {
- case ENTITY_CREATE:
- HookNotification.EntityCreateRequest createRequest =
- (HookNotification.EntityCreateRequest) message;
- atlasClient.createEntity(createRequest.getEntities());
- break;
+ if (hasNext()) {
+ HookNotification.HookNotificationMessage message = consumer.next();
+ try {
+ switch (message.getType()) {
+ case ENTITY_CREATE:
+ HookNotification.EntityCreateRequest createRequest =
+ (HookNotification.EntityCreateRequest) message;
+ atlasClient.createEntity(createRequest.getEntities());
+ break;
- case ENTITY_PARTIAL_UPDATE:
- HookNotification.EntityPartialUpdateRequest partialUpdateRequest =
- (HookNotification.EntityPartialUpdateRequest) message;
- atlasClient.updateEntity(partialUpdateRequest.getTypeName(),
- partialUpdateRequest.getAttribute(), partialUpdateRequest.getAttributeValue(),
- partialUpdateRequest.getEntity());
- break;
+ case ENTITY_PARTIAL_UPDATE:
+ HookNotification.EntityPartialUpdateRequest partialUpdateRequest =
+ (HookNotification.EntityPartialUpdateRequest) message;
+ atlasClient.updateEntity(partialUpdateRequest.getTypeName(),
+ partialUpdateRequest.getAttribute(), partialUpdateRequest.getAttributeValue(),
+ partialUpdateRequest.getEntity());
+ break;
- case ENTITY_FULL_UPDATE:
- HookNotification.EntityUpdateRequest updateRequest =
- (HookNotification.EntityUpdateRequest) message;
- atlasClient.updateEntities(updateRequest.getEntities());
- break;
+ case ENTITY_FULL_UPDATE:
+ HookNotification.EntityUpdateRequest updateRequest =
+ (HookNotification.EntityUpdateRequest) message;
+ atlasClient.updateEntities(updateRequest.getEntities());
+ break;
+ }
+ } catch (Exception e) {
+ //todo handle failures
+ LOG.warn("Error handling message {}", message, e);
+ }
}
- } catch (Exception e) {
- LOG.debug("Error handling message {}", message, e);
+ } catch(Throwable t) {
+ LOG.warn("Failure in NotificationHookConsumer", t);
}
}
}
@@ -146,7 +163,7 @@
return false;
}
}
- } catch (AtlasServiceException e) {
+ } catch (Throwable e) {
LOG.info(
"Handled AtlasServiceException while waiting for Atlas Server to become ready, " +
"exiting consumer thread.", e);
diff --git a/release-log.txt b/release-log.txt
index 6866bfd..0588ff9 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,19 +1,25 @@
Apache Atlas Release Notes
==========================
---Release 0.6-incubating
+--trunk - unreleased
+INCOMPATIBLE CHANGES:
+ALL CHANGES:
+
+
+--Release 0.6-incubating
INCOMPATIBLE CHANGES:
ATLAS-58 Make hive hook reliable (shwethags)
ATLAS-54 Rename configs in hive hook (shwethags)
ATLAS-3 Mixed Index creation fails with Date types (sumasai via shwethags)
ALL CHANGES:
+ATLAS-394 Fix BaseResourceIT.waitForNotification (shwethags via sumasai)
ATLAS-385 Support for Lineage for entities with SuperType as DataSet (anilsg via sumasai)
ATLAS-342 Atlas is sending an ENTITY_CREATE event to the ATLAS_ENTITIES topic even if the entity exists already (shwethags)
ATLAS-386 Handle hive rename Table (shwethags)
-ATLAS-374 Doc: Create a wiki for documenting fault tolerance and HA options for Atlas data (yhemath via sumasai)
-ATLAS-346 Atlas server loses messages sent from Hive hook if restarted after unclean shutdown(yhemath via sumasai)
+ATLAS-374 Doc: Create a wiki for documenting fault tolerance and HA options for Atlas data (yhemanth via sumasai)
+ATLAS-346 Atlas server loses messages sent from Hive hook if restarted after unclean shutdown (yhemanth via sumasai)
ATLAS-382 Fixed Hive Bridge doc for ATLAS cluster name (sumasai)
ATLAS-244 UI: Add Tag Tab (darshankumar89 via sumasai)
ATLAS-376 UI: Use the Schema API of the backend to populate details for Schema tab (darshankumar89 via sumasai)
@@ -22,8 +28,8 @@
ATLAS-47 Entity mutations for complex types (sumasai via shwethags)
ATLAS-345 UI: Should allow tag addition on any search result that returns a reference-able entity (darshankumar89 via shwethags)
ATLAS-279 UI not displaying results for certain successful "select" search queries (anilsg via shwethags)
-ATLAS-361 Add validation when index backends are switched in ATLAS configuration(sumasai via shwethags)
ATLAS-242 The qualified name for hive entities should be backward compatible (shwethags)
+ATLAS-361 Add validation when index backends are switched in ATLAS configuration (sumasai via shwethags)
ATLAS-171 Ability to update type definition(shwethags via sumasai)
ATLAS-352 Improve write performance on type and entity creation with Hbase (sumasai)
ATLAS-350 Document jaas config details for atlas (tbeerbower via shwethags)
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
index 6b2d5d1..9955f07 100755
--- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
@@ -252,7 +252,7 @@
*/
public Vertex getVertexForInstanceByUniqueAttribute(ClassType classType, IReferenceableInstance instance)
throws AtlasException {
-
+ LOG.debug("Checking if there is an instance with the same unique attributes for instance {}", instance);
Vertex result = null;
for (AttributeInfo attributeInfo : classType.fieldMapping().fields.values()) {
if (attributeInfo.isUnique) {
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/TitanGraphProvider.java b/repository/src/main/java/org/apache/atlas/repository/graph/TitanGraphProvider.java
index 26d5e2b..6ab7a21 100755
--- a/repository/src/main/java/org/apache/atlas/repository/graph/TitanGraphProvider.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/TitanGraphProvider.java
@@ -22,9 +22,7 @@
import com.google.inject.Provides;
import com.thinkaurelius.titan.core.TitanFactory;
import com.thinkaurelius.titan.core.TitanGraph;
-import com.thinkaurelius.titan.core.TitanTransaction;
import com.thinkaurelius.titan.core.schema.TitanManagement;
-import com.thinkaurelius.titan.diskstorage.Backend;
import com.thinkaurelius.titan.diskstorage.StandardIndexProvider;
import com.thinkaurelius.titan.diskstorage.indexing.IndexInformation;
import com.thinkaurelius.titan.diskstorage.solr.Solr5Index;
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java
index 996f31b..2f3eb30 100644
--- a/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java
@@ -231,12 +231,14 @@
List<ITypedReferenceableInstance> instancesToUpdate = new ArrayList<>();
for (IReferenceableInstance instance : instances) {
+ LOG.debug("Discovering instance to create/update for {}", instance);
ITypedReferenceableInstance newInstance;
Id id = instance.getId();
if (!idToVertexMap.containsKey(id)) {
Vertex instanceVertex;
if (id.isAssigned()) { // has a GUID
+ LOG.debug("Instance {} has an assigned id", instance.getId()._getId());
instanceVertex = graphHelper.getVertexForGUID(id.id);
if (!(instance instanceof ReferenceableInstance)) {
throw new IllegalStateException(
@@ -252,6 +254,7 @@
//no entity with the given unique attribute, create new
if (instanceVertex == null) {
+ LOG.debug("Creating new vertex for instance {}", instance);
newInstance = classType.convert(instance, Multiplicity.REQUIRED);
instanceVertex = graphHelper.createVertexWithIdentity(newInstance, classType.getAllSuperTypeNames());
instancesToCreate.add(newInstance);
@@ -260,6 +263,7 @@
mapInstanceToVertex(newInstance, instanceVertex, classType.fieldMapping().fields, true, Operation.CREATE);
} else {
+ LOG.debug("Re-using existing vertex {} for instance {}", instanceVertex.getId(), instance);
if (!(instance instanceof ReferenceableInstance)) {
throw new IllegalStateException(
String.format("%s is not of type ITypedReferenceableInstance", instance));
diff --git a/repository/src/test/java/org/apache/atlas/repository/graph/TitanGraphProviderTest.java b/repository/src/test/java/org/apache/atlas/repository/graph/TitanGraphProviderTest.java
index d824b50..6fc7008 100644
--- a/repository/src/test/java/org/apache/atlas/repository/graph/TitanGraphProviderTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/graph/TitanGraphProviderTest.java
@@ -19,9 +19,12 @@
import com.thinkaurelius.titan.core.TitanGraph;
import com.thinkaurelius.titan.core.util.TitanCleanup;
+import com.thinkaurelius.titan.diskstorage.Backend;
+import com.thinkaurelius.titan.graphdb.database.StandardTitanGraph;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.commons.configuration.Configuration;
+import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeTest;
diff --git a/server-api/pom.xml b/server-api/pom.xml
index c7488d5..88de030 100644
--- a/server-api/pom.xml
+++ b/server-api/pom.xml
@@ -22,7 +22,7 @@
<parent>
<artifactId>apache-atlas</artifactId>
<groupId>org.apache.atlas</groupId>
- <version>0.6-incubating</version>
+ <version>0.6-incubating-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
diff --git a/typesystem/src/main/resources/application.properties b/typesystem/src/main/resources/application.properties
index d475d7e..702c6f2 100644
--- a/typesystem/src/main/resources/application.properties
+++ b/typesystem/src/main/resources/application.properties
@@ -66,6 +66,7 @@
atlas.kafka.data=${sys:atlas.data}/kafka
atlas.kafka.zookeeper.session.timeout.ms=400
atlas.kafka.zookeeper.sync.time.ms=20
+atlas.kafka.consumer.timeout.ms=100
atlas.kafka.auto.commit.interval.ms=100
atlas.kafka.hook.group.id=atlas
atlas.kafka.entities.group.id=atlas_entities
diff --git a/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java b/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java
index b2501ec..cd4e743 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java
@@ -35,10 +35,8 @@
import org.apache.atlas.typesystem.types.utils.TypesUtil;
import org.apache.atlas.web.resources.BaseResourceIT;
import org.apache.atlas.web.util.Servlets;
-import org.junit.AfterClass;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
@@ -48,7 +46,9 @@
import java.util.LinkedList;
import java.util.List;
-import static org.testng.Assert.*;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
/**
* Entity Notification Integration Tests.
@@ -62,9 +62,9 @@
private final String TABLE_NAME = "table" + randomString();
@Inject
private NotificationInterface notificationInterface;
- private EntityNotificationConsumer notificationConsumer;
private Id tableId;
private String traitName;
+ private NotificationConsumer<EntityNotification> notificationConsumer;
@BeforeClass
public void setUp() throws Exception {
@@ -74,19 +74,7 @@
List<NotificationConsumer<EntityNotification>> consumers =
notificationInterface.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1);
- NotificationConsumer<EntityNotification> consumer = consumers.iterator().next();
- notificationConsumer = new EntityNotificationConsumer(consumer);
- notificationConsumer.start();
- }
-
- @AfterClass
- public void tearDown() {
- notificationConsumer.stop();
- }
-
- @BeforeMethod
- public void setupTest() {
- notificationConsumer.reset();
+ notificationConsumer = consumers.iterator().next();
}
@Test
@@ -97,17 +85,8 @@
final String guid = tableId._getId();
- waitForNotification(notificationConsumer, MAX_WAIT_TIME);
-
- EntityNotification entityNotification = notificationConsumer.getLastEntityNotification();
-
- assertNotNull(entityNotification);
- assertEquals(EntityNotification.OperationType.ENTITY_CREATE, entityNotification.getOperationType());
-
- IReferenceableInstance entity = entityNotification.getEntity();
-
- assertEquals(HIVE_TABLE_TYPE, entity.getTypeName());
- assertEquals(guid, entity.getId()._getId());
+ waitForNotification(notificationConsumer, MAX_WAIT_TIME,
+ newNotificationPredicate(EntityNotification.OperationType.ENTITY_CREATE, HIVE_TABLE_TYPE, guid));
}
@Test(dependsOnMethods = "testCreateEntity")
@@ -119,19 +98,8 @@
serviceClient.updateEntityAttribute(guid, property, newValue);
- waitForNotification(notificationConsumer, MAX_WAIT_TIME);
-
- EntityNotification entityNotification = notificationConsumer.getLastEntityNotification();
-
- assertNotNull(entityNotification);
- assertEquals(EntityNotification.OperationType.ENTITY_UPDATE, entityNotification.getOperationType());
-
- IReferenceableInstance entity = entityNotification.getEntity();
-
- assertEquals(HIVE_TABLE_TYPE, entity.getTypeName());
- assertEquals(guid, entity.getId()._getId());
-
- assertEquals(newValue, entity.getValuesMap().get(property));
+ waitForNotification(notificationConsumer, MAX_WAIT_TIME,
+ newNotificationPredicate(EntityNotification.OperationType.ENTITY_UPDATE, HIVE_TABLE_TYPE, guid));
}
@Test(dependsOnMethods = "testCreateEntity")
@@ -154,18 +122,10 @@
ClientResponse clientResponse = addTrait(guid, traitInstanceJSON);
assertEquals(clientResponse.getStatus(), Response.Status.CREATED.getStatusCode());
- waitForNotification(notificationConsumer, MAX_WAIT_TIME);
-
- EntityNotification entityNotification = notificationConsumer.getLastEntityNotification();
-
- assertNotNull(entityNotification);
- assertEquals(EntityNotification.OperationType.TRAIT_ADD, entityNotification.getOperationType());
+ EntityNotification entityNotification = waitForNotification(notificationConsumer, MAX_WAIT_TIME,
+ newNotificationPredicate(EntityNotification.OperationType.TRAIT_ADD, HIVE_TABLE_TYPE, guid));
IReferenceableInstance entity = entityNotification.getEntity();
-
- assertEquals(HIVE_TABLE_TYPE, entity.getTypeName());
- assertEquals(guid, entity.getId()._getId());
-
assertTrue(entity.getTraits().contains(traitName));
List<IStruct> allTraits = entityNotification.getAllTraits();
@@ -178,9 +138,6 @@
assertTrue(allTraitNames.contains(superTraitName));
assertTrue(allTraitNames.contains(superSuperTraitName));
- // add another trait with the same super type to the entity
- notificationConsumer.reset();
-
String anotherTraitName = "Trait" + randomString();
createTrait(anotherTraitName, superTraitName);
@@ -191,12 +148,8 @@
clientResponse = addTrait(guid, traitInstanceJSON);
assertEquals(clientResponse.getStatus(), Response.Status.CREATED.getStatusCode());
- waitForNotification(notificationConsumer, MAX_WAIT_TIME);
-
- entityNotification = notificationConsumer.getLastEntityNotification();
-
- assertNotNull(entityNotification);
- assertEquals(EntityNotification.OperationType.TRAIT_ADD, entityNotification.getOperationType());
+ entityNotification = waitForNotification(notificationConsumer, MAX_WAIT_TIME,
+ newNotificationPredicate(EntityNotification.OperationType.TRAIT_ADD, HIVE_TABLE_TYPE, guid));
allTraits = entityNotification.getAllTraits();
allTraitNames = new LinkedList<>();
@@ -217,20 +170,10 @@
ClientResponse clientResponse = deleteTrait(guid, traitName);
Assert.assertEquals(clientResponse.getStatus(), Response.Status.OK.getStatusCode());
- waitForNotification(notificationConsumer, MAX_WAIT_TIME);
+ EntityNotification entityNotification = waitForNotification(notificationConsumer, MAX_WAIT_TIME,
+ newNotificationPredicate(EntityNotification.OperationType.TRAIT_DELETE, HIVE_TABLE_TYPE, guid));
- EntityNotification entityNotification = notificationConsumer.getLastEntityNotification();
-
- assertNotNull(entityNotification);
- assertEquals(EntityNotification.OperationType.TRAIT_DELETE,
- entityNotification.getOperationType());
-
- IReferenceableInstance entity = entityNotification.getEntity();
-
- assertEquals(HIVE_TABLE_TYPE, entity.getTypeName());
- assertEquals(guid, entity.getId()._getId());
-
- assertFalse(entity.getTraits().contains(traitName));
+ assertFalse(entityNotification.getEntity().getTraits().contains(traitName));
}
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java
index 3a4661c..e64e949 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java
@@ -59,7 +59,7 @@
sendHookMessage(new HookNotification.EntityCreateRequest(entity));
- waitFor(1000, new Predicate() {
+ waitFor(MAX_WAIT_TIME, new Predicate() {
@Override
public boolean evaluate() throws Exception {
JSONArray results = serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE,
@@ -80,7 +80,7 @@
final Referenceable newEntity = new Referenceable(DATABASE_TYPE);
newEntity.set("owner", randomString());
sendHookMessage(new HookNotification.EntityPartialUpdateRequest(DATABASE_TYPE, "name", dbName, newEntity));
- waitFor(1000, new Predicate() {
+ waitFor(MAX_WAIT_TIME, new Predicate() {
@Override
public boolean evaluate() throws Exception {
Referenceable localEntity = serviceClient.getEntity(DATABASE_TYPE, "name", dbName);
@@ -106,7 +106,7 @@
newEntity.set("name", newName);
sendHookMessage(new HookNotification.EntityPartialUpdateRequest(DATABASE_TYPE, "name", dbName, newEntity));
- waitFor(1000, new Predicate() {
+ waitFor(MAX_WAIT_TIME, new Predicate() {
@Override
public boolean evaluate() throws Exception {
JSONArray results = serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE,
@@ -136,7 +136,7 @@
//updating unique attribute
sendHookMessage(new HookNotification.EntityUpdateRequest(newEntity));
- waitFor(1000, new Predicate() {
+ waitFor(MAX_WAIT_TIME, new Predicate() {
@Override
public boolean evaluate() throws Exception {
JSONArray results = serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE,
diff --git a/webapp/src/test/java/org/apache/atlas/web/resources/BaseResourceIT.java b/webapp/src/test/java/org/apache/atlas/web/resources/BaseResourceIT.java
index aba191c..34abeab 100755
--- a/webapp/src/test/java/org/apache/atlas/web/resources/BaseResourceIT.java
+++ b/webapp/src/test/java/org/apache/atlas/web/resources/BaseResourceIT.java
@@ -24,7 +24,10 @@
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.api.client.config.DefaultClientConfig;
-import org.apache.atlas.*;
+import kafka.consumer.ConsumerTimeoutException;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasClient;
+import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.notification.NotificationConsumer;
import org.apache.atlas.notification.entity.EntityNotification;
import org.apache.atlas.typesystem.Referenceable;
@@ -43,6 +46,7 @@
import org.apache.atlas.typesystem.types.Multiplicity;
import org.apache.atlas.typesystem.types.StructTypeDefinition;
import org.apache.atlas.typesystem.types.TraitType;
+import org.apache.atlas.typesystem.types.TypeUtils;
import org.apache.atlas.typesystem.types.utils.TypesUtil;
import org.apache.atlas.utils.ParamChecker;
import org.apache.atlas.web.util.Servlets;
@@ -272,6 +276,17 @@
boolean evaluate() throws Exception;
}
+ public interface NotificationPredicate {
+
+ /**
+ * Perform a predicate evaluation.
+ *
+ * @return the boolean result of the evaluation.
+ * @throws Exception thrown if the predicate evaluation could not evaluate.
+ */
+ boolean evaluate(EntityNotification notification) throws Exception;
+ }
+
/**
* Wait for a condition, expressed via a {@link Predicate} to become true.
*
@@ -292,49 +307,40 @@
}
}
- // ----- inner class : EntityNotificationConsumer --------------------------
-
- protected static class EntityNotificationConsumer implements Runnable {
- private final NotificationConsumer<EntityNotification> consumerIterator;
- private EntityNotification entityNotification = null;
- private boolean run;
-
- public EntityNotificationConsumer(NotificationConsumer<EntityNotification> consumerIterator) {
- this.consumerIterator = consumerIterator;
- }
-
- @Override
- public void run() {
- while (run && consumerIterator.hasNext()) {
- entityNotification = consumerIterator.next();
- }
- }
-
- public void reset() {
- entityNotification = null;
- }
-
- public void start() {
- Thread thread = new Thread(this);
- run = true;
- thread.start();
- }
-
- public void stop() {
- run = false;
- }
-
- public EntityNotification getLastEntityNotification() {
- return entityNotification;
- }
- }
-
- protected void waitForNotification(final EntityNotificationConsumer notificationConsumer, int maxWait) throws Exception {
+ protected EntityNotification waitForNotification(final NotificationConsumer<EntityNotification> consumer, int maxWait,
+ final NotificationPredicate predicate) throws Exception {
+ final TypeUtils.Pair<EntityNotification, String> pair = TypeUtils.Pair.of(null, null);
+ final long maxCurrentTime = System.currentTimeMillis() + maxWait;
waitFor(maxWait, new Predicate() {
@Override
public boolean evaluate() throws Exception {
- return notificationConsumer.getLastEntityNotification() != null;
+ try {
+ while (consumer.hasNext() && System.currentTimeMillis() < maxCurrentTime) {
+ EntityNotification notification = consumer.next();
+ if (predicate.evaluate(notification)) {
+ pair.left = notification;
+ return true;
+ }
+ }
+ } catch(ConsumerTimeoutException e) {
+ //ignore
+ }
+ return false;
}
});
+ return pair.left;
+ }
+
+ protected NotificationPredicate newNotificationPredicate(final EntityNotification.OperationType operationType,
+ final String typeName, final String guid) {
+ return new NotificationPredicate() {
+ @Override
+ public boolean evaluate(EntityNotification notification) throws Exception {
+ return notification != null &&
+ notification.getOperationType() == operationType &&
+ notification.getEntity().getTypeName().equals(typeName) &&
+ notification.getEntity().getId()._getId().equals(guid);
+ }
+ };
}
}
diff --git a/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java b/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java
index a268196..f2599b5 100755
--- a/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java
+++ b/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java
@@ -67,7 +67,6 @@
import java.util.UUID;
import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.fail;
/**
@@ -89,7 +88,7 @@
@Inject
private NotificationInterface notificationInterface;
- private EntityNotificationConsumer notificationConsumer;
+ private NotificationConsumer<EntityNotification> notificationConsumer;
@BeforeClass
public void setUp() throws Exception {
@@ -100,19 +99,7 @@
List<NotificationConsumer<EntityNotification>> consumers =
notificationInterface.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1);
- NotificationConsumer<EntityNotification> consumer = consumers.iterator().next();
- notificationConsumer = new EntityNotificationConsumer(consumer);
- notificationConsumer.start();
- }
-
- @AfterClass
- public void tearDown() {
- notificationConsumer.stop();
- }
-
- @BeforeMethod
- public void setupTest() {
- notificationConsumer.reset();
+ notificationConsumer = consumers.iterator().next();
}
@Test
@@ -158,20 +145,26 @@
serviceClient.createEntity(db).getString(0);
- waitForNotification(notificationConsumer, MAX_WAIT_TIME);
- EntityNotification notification = notificationConsumer.getLastEntityNotification();
- assertNotNull(notification);
- assertEquals(notification.getEntity().get("name"), dbName);
+ waitForNotification(notificationConsumer, MAX_WAIT_TIME, new NotificationPredicate() {
+ @Override
+ public boolean evaluate(EntityNotification notification) throws Exception {
+ return notification != null && notification.getEntity().get("name").equals(dbName);
+ }
+ });
JSONArray results =
serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, dbName));
assertEquals(results.length(), 1);
//create entity again shouldn't create another instance with same unique attribute value
- notificationConsumer.reset();
serviceClient.createEntity(db);
try {
- waitForNotification(notificationConsumer, MAX_WAIT_TIME);
+ waitForNotification(notificationConsumer, MAX_WAIT_TIME, new NotificationPredicate() {
+ @Override
+ public boolean evaluate(EntityNotification notification) throws Exception {
+ return notification != null && notification.getEntity().get("name").equals(dbName);
+ }
+ });
fail("Expected time out exception");
} catch (Exception e) {
//expected timeout