Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-atlas into branch-0.6-incubating
diff --git a/distro/src/conf/application.properties b/distro/src/conf/application.properties
index f9888bd..cb6ee31 100755
--- a/distro/src/conf/application.properties
+++ b/distro/src/conf/application.properties
@@ -57,6 +57,7 @@
 atlas.kafka.zookeeper.session.timeout.ms=400
 atlas.kafka.zookeeper.sync.time.ms=20
 atlas.kafka.auto.commit.interval.ms=1000
+atlas.kafka.auto.offset.reset=smallest
 atlas.kafka.hook.group.id=atlas
 
 
diff --git a/docs/src/site/twiki/Bridge-Hive.twiki b/docs/src/site/twiki/Bridge-Hive.twiki
index 0c7732b..c8e9009 100644
--- a/docs/src/site/twiki/Bridge-Hive.twiki
+++ b/docs/src/site/twiki/Bridge-Hive.twiki
@@ -19,12 +19,12 @@
 hive_process(ClassType) - super types [Process] - attributes [startTime, endTime, userName, operationType, queryText, queryPlan, queryId, queryGraph]
 </verbatim>
 
-The entities are created and de-duped using unique qualified name. They provide namespace and can be used for querying/lineage as well. Note that dbName and tableName should be in lower case. clusterName is explained below:
-hive_db - attribute qualifiedName - <dbName>@<clusterName>
-hive_table - attribute name - <dbName>.<tableName>@<clusterName>
-hive_column - attribute qualifiedName - <dbName>.<tableName>.<columnName>@<clusterName>
-hive_partition - attribute qualifiedName - <dbName>.<tableName>.<partitionValues('-' separated)>@<clusterName>
-hive_process - attribute name - <queryString> - trimmed query string in lower case
+The entities are created and de-duped using unique qualified name. They provide namespace and can be used for querying/lineage as well. Note that dbName and tableName should be in lower case. clusterName is explained below.
+   * hive_db - attribute qualifiedName - <dbName>@<clusterName>
+   * hive_table - attribute name - <dbName>.<tableName>@<clusterName>
+   * hive_column - attribute qualifiedName - <dbName>.<tableName>.<columnName>@<clusterName>
+   * hive_partition - attribute qualifiedName - <dbName>.<tableName>.<partitionValues('-' separated)>@<clusterName>
+   * hive_process - attribute name - <queryString> - trimmed query string in lower case
 
 
 ---++ Importing Hive Metadata
diff --git a/docs/src/site/twiki/Configuration.twiki b/docs/src/site/twiki/Configuration.twiki
index 4e13d61..8b942ab 100644
--- a/docs/src/site/twiki/Configuration.twiki
+++ b/docs/src/site/twiki/Configuration.twiki
@@ -53,8 +53,7 @@
    echo "grant 'atlas', 'RWXCA', 'titan'" | hbase shell
 </verbatim>
 
----++++ Graph Search Index
-
+---+++ Graph Search Index
 This section sets up the graph db - titan - to use an search indexing system. The example
 configuration below sets up to use an embedded Elastic search indexing system.
 
diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java
index d4e07c0..1f05df4 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java
@@ -69,4 +69,10 @@
                 consumerId, message.topic(), message.partition(), message.offset(), message.message());
         return (String) message.message();
     }
+
+    @Override
+    protected String peekMessage() {
+        MessageAndMetadata message = (MessageAndMetadata) iterator.peek();
+        return (String) message.message();
+    }
 }
diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java b/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java
index 42a4e7f..b6a9d7b 100644
--- a/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java
@@ -94,10 +94,11 @@
     }
 
     @Override
-    public void remove() {
-        throw new UnsupportedOperationException("The remove method is not supported.");
+    public T peek() {
+        return GSON.fromJson(peekMessage(), type);
     }
 
+    protected abstract String peekMessage();
 
     // ----- inner class : ImmutableListDeserializer ---------------------------
 
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
index 346ec3e..d2da975 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
@@ -17,8 +17,11 @@
 
 package org.apache.atlas.notification;
 
-import java.util.Iterator;
-
 // TODO : docs!
-public interface NotificationConsumer<T> extends Iterator<T>{
+public interface NotificationConsumer<T>{
+    boolean hasNext();
+
+    T next();
+
+    T peek();
 }
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 6876758..3352cd0 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -19,10 +19,10 @@
 
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+import kafka.consumer.ConsumerTimeoutException;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasClient;
 import org.apache.atlas.AtlasException;
-import org.apache.atlas.AtlasServiceException;
 import org.apache.atlas.notification.hook.HookNotification;
 import org.apache.atlas.service.Service;
 import org.apache.commons.configuration.Configuration;
@@ -62,7 +62,7 @@
         executors = Executors.newFixedThreadPool(consumers.size());
 
         for (final NotificationConsumer<HookNotification.HookNotificationMessage> consumer : consumers) {
-            executors.submit(new HookConsumer(atlasClient, consumer));
+            executors.submit(new HookConsumer(consumer));
         }
     }
 
@@ -89,12 +89,23 @@
         private final NotificationConsumer<HookNotification.HookNotificationMessage> consumer;
         private final AtlasClient client;
 
-        public HookConsumer(AtlasClient atlasClient,
-                            NotificationConsumer<HookNotification.HookNotificationMessage> consumer) {
-            this.client = atlasClient;
+        public HookConsumer(NotificationConsumer<HookNotification.HookNotificationMessage> consumer) {
+            this(atlasClient, consumer);
+        }
+
+        public HookConsumer(AtlasClient client, NotificationConsumer<HookNotification.HookNotificationMessage> consumer) {
+            this.client = client;
             this.consumer = consumer;
         }
 
+        private boolean hasNext() {
+            try {
+                return consumer.hasNext();
+            } catch(ConsumerTimeoutException e) {
+                return false;
+            }
+        }
+
         @Override
         public void run() {
 
@@ -102,33 +113,39 @@
                 return;
             }
 
-            while(consumer.hasNext()) {
-                HookNotification.HookNotificationMessage message = consumer.next();
-
+            while(true) {
                 try {
-                    switch (message.getType()) {
-                        case ENTITY_CREATE:
-                            HookNotification.EntityCreateRequest createRequest =
-                                    (HookNotification.EntityCreateRequest) message;
-                            atlasClient.createEntity(createRequest.getEntities());
-                            break;
+                    if (hasNext()) {
+                        HookNotification.HookNotificationMessage message = consumer.next();
+                        try {
+                            switch (message.getType()) {
+                                case ENTITY_CREATE:
+                                    HookNotification.EntityCreateRequest createRequest =
+                                            (HookNotification.EntityCreateRequest) message;
+                                    atlasClient.createEntity(createRequest.getEntities());
+                                    break;
 
-                        case ENTITY_PARTIAL_UPDATE:
-                            HookNotification.EntityPartialUpdateRequest partialUpdateRequest =
-                                    (HookNotification.EntityPartialUpdateRequest) message;
-                            atlasClient.updateEntity(partialUpdateRequest.getTypeName(),
-                                    partialUpdateRequest.getAttribute(), partialUpdateRequest.getAttributeValue(),
-                                    partialUpdateRequest.getEntity());
-                            break;
+                                case ENTITY_PARTIAL_UPDATE:
+                                    HookNotification.EntityPartialUpdateRequest partialUpdateRequest =
+                                            (HookNotification.EntityPartialUpdateRequest) message;
+                                    atlasClient.updateEntity(partialUpdateRequest.getTypeName(),
+                                            partialUpdateRequest.getAttribute(), partialUpdateRequest.getAttributeValue(),
+                                            partialUpdateRequest.getEntity());
+                                    break;
 
-                        case ENTITY_FULL_UPDATE:
-                            HookNotification.EntityUpdateRequest updateRequest =
-                                    (HookNotification.EntityUpdateRequest) message;
-                            atlasClient.updateEntities(updateRequest.getEntities());
-                            break;
+                                case ENTITY_FULL_UPDATE:
+                                    HookNotification.EntityUpdateRequest updateRequest =
+                                            (HookNotification.EntityUpdateRequest) message;
+                                    atlasClient.updateEntities(updateRequest.getEntities());
+                                    break;
+                            }
+                        } catch (Exception e) {
+                            //todo handle failures
+                            LOG.warn("Error handling message {}", message, e);
+                        }
                     }
-                } catch (Exception e) {
-                    LOG.debug("Error handling message {}", message, e);
+                } catch(Throwable t) {
+                    LOG.warn("Failure in NotificationHookConsumer", t);
                 }
             }
         }
@@ -146,7 +163,7 @@
                         return false;
                     }
                 }
-            } catch (AtlasServiceException e) {
+            } catch (Throwable e) {
                 LOG.info(
                         "Handled AtlasServiceException while waiting for Atlas Server to become ready, " +
                                 "exiting consumer thread.", e);
diff --git a/release-log.txt b/release-log.txt
index 6866bfd..0588ff9 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,19 +1,25 @@
 Apache Atlas Release Notes
 ==========================
 
---Release 0.6-incubating
+--trunk - unreleased
+INCOMPATIBLE CHANGES:
 
+ALL CHANGES:
+
+
+--Release 0.6-incubating
 INCOMPATIBLE CHANGES:
 ATLAS-58 Make hive hook reliable (shwethags)
 ATLAS-54 Rename configs in hive hook (shwethags)
 ATLAS-3 Mixed Index creation fails with Date types (sumasai via shwethags)
 
 ALL CHANGES:
+ATLAS-394 Fix BaseResourceIT.waitForNotification (shwethags via sumasai)
 ATLAS-385 Support for Lineage for entities with SuperType as DataSet (anilsg via sumasai)
 ATLAS-342 Atlas is sending an ENTITY_CREATE event to the ATLAS_ENTITIES topic even if the entity exists already (shwethags)
 ATLAS-386 Handle hive rename Table (shwethags)
-ATLAS-374 Doc: Create a wiki for documenting fault tolerance and HA options for Atlas data (yhemath via sumasai)
-ATLAS-346 Atlas server loses messages sent from Hive hook if restarted after unclean shutdown(yhemath via sumasai)
+ATLAS-374 Doc: Create a wiki for documenting fault tolerance and HA options for Atlas data (yhemanth via sumasai)
+ATLAS-346 Atlas server loses messages sent from Hive hook if restarted after unclean shutdown (yhemanth via sumasai)
 ATLAS-382 Fixed Hive Bridge doc for ATLAS cluster name (sumasai)
 ATLAS-244 UI: Add Tag Tab (darshankumar89 via sumasai)
 ATLAS-376 UI: Use the Schema API of the backend to populate details for Schema tab (darshankumar89 via sumasai)
@@ -22,8 +28,8 @@
 ATLAS-47 Entity mutations for complex types (sumasai via shwethags)
 ATLAS-345 UI: Should allow tag addition on any search result that returns a reference-able entity (darshankumar89 via shwethags)
 ATLAS-279 UI not displaying results for certain successful "select" search queries (anilsg via shwethags)
-ATLAS-361 Add validation when index backends are switched in ATLAS configuration(sumasai via shwethags)
 ATLAS-242 The qualified name for hive entities should be backward compatible (shwethags)
+ATLAS-361 Add validation when index backends are switched in ATLAS configuration (sumasai via shwethags)
 ATLAS-171 Ability to update type definition(shwethags via sumasai)
 ATLAS-352 Improve write performance on type and entity creation with Hbase (sumasai)
 ATLAS-350 Document jaas config details for atlas (tbeerbower via shwethags)
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
index 6b2d5d1..9955f07 100755
--- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
@@ -252,7 +252,7 @@
      */
     public Vertex getVertexForInstanceByUniqueAttribute(ClassType classType, IReferenceableInstance instance)
         throws AtlasException {
-
+        LOG.debug("Checking if there is an instance with the same unique attributes for instance {}", instance);
         Vertex result = null;
         for (AttributeInfo attributeInfo : classType.fieldMapping().fields.values()) {
             if (attributeInfo.isUnique) {
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/TitanGraphProvider.java b/repository/src/main/java/org/apache/atlas/repository/graph/TitanGraphProvider.java
index 26d5e2b..6ab7a21 100755
--- a/repository/src/main/java/org/apache/atlas/repository/graph/TitanGraphProvider.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/TitanGraphProvider.java
@@ -22,9 +22,7 @@
 import com.google.inject.Provides;
 import com.thinkaurelius.titan.core.TitanFactory;
 import com.thinkaurelius.titan.core.TitanGraph;
-import com.thinkaurelius.titan.core.TitanTransaction;
 import com.thinkaurelius.titan.core.schema.TitanManagement;
-import com.thinkaurelius.titan.diskstorage.Backend;
 import com.thinkaurelius.titan.diskstorage.StandardIndexProvider;
 import com.thinkaurelius.titan.diskstorage.indexing.IndexInformation;
 import com.thinkaurelius.titan.diskstorage.solr.Solr5Index;
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java
index 996f31b..2f3eb30 100644
--- a/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java
@@ -231,12 +231,14 @@
         List<ITypedReferenceableInstance> instancesToUpdate = new ArrayList<>();
 
         for (IReferenceableInstance instance : instances) {
+            LOG.debug("Discovering instance to create/update for {}", instance);
             ITypedReferenceableInstance newInstance;
             Id id = instance.getId();
 
             if (!idToVertexMap.containsKey(id)) {
                 Vertex instanceVertex;
                 if (id.isAssigned()) {  // has a GUID
+                    LOG.debug("Instance {} has an assigned id", instance.getId()._getId());
                     instanceVertex = graphHelper.getVertexForGUID(id.id);
                     if (!(instance instanceof ReferenceableInstance)) {
                         throw new IllegalStateException(
@@ -252,6 +254,7 @@
 
                     //no entity with the given unique attribute, create new
                     if (instanceVertex == null) {
+                        LOG.debug("Creating new vertex for instance {}", instance);
                         newInstance = classType.convert(instance, Multiplicity.REQUIRED);
                         instanceVertex = graphHelper.createVertexWithIdentity(newInstance, classType.getAllSuperTypeNames());
                         instancesToCreate.add(newInstance);
@@ -260,6 +263,7 @@
                         mapInstanceToVertex(newInstance, instanceVertex, classType.fieldMapping().fields, true, Operation.CREATE);
 
                     } else {
+                        LOG.debug("Re-using existing vertex {} for instance {}", instanceVertex.getId(), instance);
                         if (!(instance instanceof ReferenceableInstance)) {
                             throw new IllegalStateException(
                                     String.format("%s is not of type ITypedReferenceableInstance", instance));
diff --git a/repository/src/test/java/org/apache/atlas/repository/graph/TitanGraphProviderTest.java b/repository/src/test/java/org/apache/atlas/repository/graph/TitanGraphProviderTest.java
index d824b50..6fc7008 100644
--- a/repository/src/test/java/org/apache/atlas/repository/graph/TitanGraphProviderTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/graph/TitanGraphProviderTest.java
@@ -19,9 +19,12 @@
 
 import com.thinkaurelius.titan.core.TitanGraph;
 import com.thinkaurelius.titan.core.util.TitanCleanup;
+import com.thinkaurelius.titan.diskstorage.Backend;
+import com.thinkaurelius.titan.graphdb.database.StandardTitanGraph;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasException;
 import org.apache.commons.configuration.Configuration;
+import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeTest;
diff --git a/server-api/pom.xml b/server-api/pom.xml
index c7488d5..88de030 100644
--- a/server-api/pom.xml
+++ b/server-api/pom.xml
@@ -22,7 +22,7 @@
     <parent>
         <artifactId>apache-atlas</artifactId>
         <groupId>org.apache.atlas</groupId>
-        <version>0.6-incubating</version>
+        <version>0.6-incubating-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
diff --git a/typesystem/src/main/resources/application.properties b/typesystem/src/main/resources/application.properties
index d475d7e..702c6f2 100644
--- a/typesystem/src/main/resources/application.properties
+++ b/typesystem/src/main/resources/application.properties
@@ -66,6 +66,7 @@
 atlas.kafka.data=${sys:atlas.data}/kafka
 atlas.kafka.zookeeper.session.timeout.ms=400
 atlas.kafka.zookeeper.sync.time.ms=20
+atlas.kafka.consumer.timeout.ms=100
 atlas.kafka.auto.commit.interval.ms=100
 atlas.kafka.hook.group.id=atlas
 atlas.kafka.entities.group.id=atlas_entities
diff --git a/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java b/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java
index b2501ec..cd4e743 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java
@@ -35,10 +35,8 @@
 import org.apache.atlas.typesystem.types.utils.TypesUtil;
 import org.apache.atlas.web.resources.BaseResourceIT;
 import org.apache.atlas.web.util.Servlets;
-import org.junit.AfterClass;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Guice;
 import org.testng.annotations.Test;
 
@@ -48,7 +46,9 @@
 import java.util.LinkedList;
 import java.util.List;
 
-import static org.testng.Assert.*;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
 
 /**
  * Entity Notification Integration Tests.
@@ -62,9 +62,9 @@
     private final String TABLE_NAME = "table" + randomString();
     @Inject
     private NotificationInterface notificationInterface;
-    private EntityNotificationConsumer notificationConsumer;
     private Id tableId;
     private String traitName;
+    private NotificationConsumer<EntityNotification> notificationConsumer;
 
     @BeforeClass
     public void setUp() throws Exception {
@@ -74,19 +74,7 @@
         List<NotificationConsumer<EntityNotification>> consumers =
             notificationInterface.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1);
 
-        NotificationConsumer<EntityNotification> consumer = consumers.iterator().next();
-        notificationConsumer = new EntityNotificationConsumer(consumer);
-        notificationConsumer.start();
-    }
-
-    @AfterClass
-    public void tearDown() {
-        notificationConsumer.stop();
-    }
-
-    @BeforeMethod
-    public void setupTest() {
-        notificationConsumer.reset();
+        notificationConsumer = consumers.iterator().next();
     }
 
     @Test
@@ -97,17 +85,8 @@
 
         final String guid = tableId._getId();
 
-        waitForNotification(notificationConsumer, MAX_WAIT_TIME);
-
-        EntityNotification entityNotification = notificationConsumer.getLastEntityNotification();
-
-        assertNotNull(entityNotification);
-        assertEquals(EntityNotification.OperationType.ENTITY_CREATE, entityNotification.getOperationType());
-
-        IReferenceableInstance entity = entityNotification.getEntity();
-
-        assertEquals(HIVE_TABLE_TYPE, entity.getTypeName());
-        assertEquals(guid, entity.getId()._getId());
+        waitForNotification(notificationConsumer, MAX_WAIT_TIME,
+                newNotificationPredicate(EntityNotification.OperationType.ENTITY_CREATE, HIVE_TABLE_TYPE, guid));
     }
 
     @Test(dependsOnMethods = "testCreateEntity")
@@ -119,19 +98,8 @@
 
         serviceClient.updateEntityAttribute(guid, property, newValue);
 
-        waitForNotification(notificationConsumer, MAX_WAIT_TIME);
-
-        EntityNotification entityNotification = notificationConsumer.getLastEntityNotification();
-
-        assertNotNull(entityNotification);
-        assertEquals(EntityNotification.OperationType.ENTITY_UPDATE, entityNotification.getOperationType());
-
-        IReferenceableInstance entity = entityNotification.getEntity();
-
-        assertEquals(HIVE_TABLE_TYPE, entity.getTypeName());
-        assertEquals(guid, entity.getId()._getId());
-
-        assertEquals(newValue, entity.getValuesMap().get(property));
+        waitForNotification(notificationConsumer, MAX_WAIT_TIME,
+                newNotificationPredicate(EntityNotification.OperationType.ENTITY_UPDATE, HIVE_TABLE_TYPE, guid));
     }
 
     @Test(dependsOnMethods = "testCreateEntity")
@@ -154,18 +122,10 @@
         ClientResponse clientResponse = addTrait(guid, traitInstanceJSON);
         assertEquals(clientResponse.getStatus(), Response.Status.CREATED.getStatusCode());
 
-        waitForNotification(notificationConsumer, MAX_WAIT_TIME);
-
-        EntityNotification entityNotification = notificationConsumer.getLastEntityNotification();
-
-        assertNotNull(entityNotification);
-        assertEquals(EntityNotification.OperationType.TRAIT_ADD, entityNotification.getOperationType());
+        EntityNotification entityNotification = waitForNotification(notificationConsumer, MAX_WAIT_TIME,
+                newNotificationPredicate(EntityNotification.OperationType.TRAIT_ADD, HIVE_TABLE_TYPE, guid));
 
         IReferenceableInstance entity = entityNotification.getEntity();
-
-        assertEquals(HIVE_TABLE_TYPE, entity.getTypeName());
-        assertEquals(guid, entity.getId()._getId());
-
         assertTrue(entity.getTraits().contains(traitName));
 
         List<IStruct> allTraits = entityNotification.getAllTraits();
@@ -178,9 +138,6 @@
         assertTrue(allTraitNames.contains(superTraitName));
         assertTrue(allTraitNames.contains(superSuperTraitName));
 
-        // add another trait with the same super type to the entity
-        notificationConsumer.reset();
-
         String anotherTraitName = "Trait" + randomString();
         createTrait(anotherTraitName, superTraitName);
 
@@ -191,12 +148,8 @@
         clientResponse = addTrait(guid, traitInstanceJSON);
         assertEquals(clientResponse.getStatus(), Response.Status.CREATED.getStatusCode());
 
-        waitForNotification(notificationConsumer, MAX_WAIT_TIME);
-
-        entityNotification = notificationConsumer.getLastEntityNotification();
-
-        assertNotNull(entityNotification);
-        assertEquals(EntityNotification.OperationType.TRAIT_ADD, entityNotification.getOperationType());
+        entityNotification = waitForNotification(notificationConsumer, MAX_WAIT_TIME,
+                newNotificationPredicate(EntityNotification.OperationType.TRAIT_ADD, HIVE_TABLE_TYPE, guid));
 
         allTraits = entityNotification.getAllTraits();
         allTraitNames = new LinkedList<>();
@@ -217,20 +170,10 @@
         ClientResponse clientResponse = deleteTrait(guid, traitName);
         Assert.assertEquals(clientResponse.getStatus(), Response.Status.OK.getStatusCode());
 
-        waitForNotification(notificationConsumer, MAX_WAIT_TIME);
+        EntityNotification entityNotification = waitForNotification(notificationConsumer, MAX_WAIT_TIME,
+                newNotificationPredicate(EntityNotification.OperationType.TRAIT_DELETE, HIVE_TABLE_TYPE, guid));
 
-        EntityNotification entityNotification = notificationConsumer.getLastEntityNotification();
-
-        assertNotNull(entityNotification);
-        assertEquals(EntityNotification.OperationType.TRAIT_DELETE,
-            entityNotification.getOperationType());
-
-        IReferenceableInstance entity = entityNotification.getEntity();
-
-        assertEquals(HIVE_TABLE_TYPE, entity.getTypeName());
-        assertEquals(guid, entity.getId()._getId());
-
-        assertFalse(entity.getTraits().contains(traitName));
+        assertFalse(entityNotification.getEntity().getTraits().contains(traitName));
     }
 
 
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java
index 3a4661c..e64e949 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java
@@ -59,7 +59,7 @@
 
         sendHookMessage(new HookNotification.EntityCreateRequest(entity));
 
-        waitFor(1000, new Predicate() {
+        waitFor(MAX_WAIT_TIME, new Predicate() {
             @Override
             public boolean evaluate() throws Exception {
                 JSONArray results = serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE,
@@ -80,7 +80,7 @@
         final Referenceable newEntity = new Referenceable(DATABASE_TYPE);
         newEntity.set("owner", randomString());
         sendHookMessage(new HookNotification.EntityPartialUpdateRequest(DATABASE_TYPE, "name", dbName, newEntity));
-        waitFor(1000, new Predicate() {
+        waitFor(MAX_WAIT_TIME, new Predicate() {
             @Override
             public boolean evaluate() throws Exception {
                 Referenceable localEntity = serviceClient.getEntity(DATABASE_TYPE, "name", dbName);
@@ -106,7 +106,7 @@
         newEntity.set("name", newName);
 
         sendHookMessage(new HookNotification.EntityPartialUpdateRequest(DATABASE_TYPE, "name", dbName, newEntity));
-        waitFor(1000, new Predicate() {
+        waitFor(MAX_WAIT_TIME, new Predicate() {
             @Override
             public boolean evaluate() throws Exception {
                 JSONArray results = serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE,
@@ -136,7 +136,7 @@
 
         //updating unique attribute
         sendHookMessage(new HookNotification.EntityUpdateRequest(newEntity));
-        waitFor(1000, new Predicate() {
+        waitFor(MAX_WAIT_TIME, new Predicate() {
             @Override
             public boolean evaluate() throws Exception {
                 JSONArray results = serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE,
diff --git a/webapp/src/test/java/org/apache/atlas/web/resources/BaseResourceIT.java b/webapp/src/test/java/org/apache/atlas/web/resources/BaseResourceIT.java
index aba191c..34abeab 100755
--- a/webapp/src/test/java/org/apache/atlas/web/resources/BaseResourceIT.java
+++ b/webapp/src/test/java/org/apache/atlas/web/resources/BaseResourceIT.java
@@ -24,7 +24,10 @@
 import com.sun.jersey.api.client.ClientResponse;
 import com.sun.jersey.api.client.WebResource;
 import com.sun.jersey.api.client.config.DefaultClientConfig;
-import org.apache.atlas.*;
+import kafka.consumer.ConsumerTimeoutException;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasClient;
+import org.apache.atlas.AtlasServiceException;
 import org.apache.atlas.notification.NotificationConsumer;
 import org.apache.atlas.notification.entity.EntityNotification;
 import org.apache.atlas.typesystem.Referenceable;
@@ -43,6 +46,7 @@
 import org.apache.atlas.typesystem.types.Multiplicity;
 import org.apache.atlas.typesystem.types.StructTypeDefinition;
 import org.apache.atlas.typesystem.types.TraitType;
+import org.apache.atlas.typesystem.types.TypeUtils;
 import org.apache.atlas.typesystem.types.utils.TypesUtil;
 import org.apache.atlas.utils.ParamChecker;
 import org.apache.atlas.web.util.Servlets;
@@ -272,6 +276,17 @@
         boolean evaluate() throws Exception;
     }
 
+    public interface NotificationPredicate {
+
+        /**
+         * Perform a predicate evaluation.
+         *
+         * @return the boolean result of the evaluation.
+         * @throws Exception thrown if the predicate evaluation could not evaluate.
+         */
+        boolean evaluate(EntityNotification notification) throws Exception;
+    }
+
     /**
      * Wait for a condition, expressed via a {@link Predicate} to become true.
      *
@@ -292,49 +307,40 @@
         }
     }
 
-    // ----- inner class : EntityNotificationConsumer --------------------------
-
-    protected static class EntityNotificationConsumer implements Runnable {
-        private final NotificationConsumer<EntityNotification> consumerIterator;
-        private EntityNotification entityNotification = null;
-        private boolean run;
-
-        public EntityNotificationConsumer(NotificationConsumer<EntityNotification> consumerIterator) {
-            this.consumerIterator = consumerIterator;
-        }
-
-        @Override
-        public void run() {
-            while (run && consumerIterator.hasNext()) {
-                entityNotification = consumerIterator.next();
-            }
-        }
-
-        public void reset() {
-            entityNotification = null;
-        }
-
-        public void start() {
-            Thread thread = new Thread(this);
-            run = true;
-            thread.start();
-        }
-
-        public void stop() {
-            run = false;
-        }
-
-        public EntityNotification getLastEntityNotification() {
-            return entityNotification;
-        }
-    }
-
-    protected void waitForNotification(final EntityNotificationConsumer notificationConsumer, int maxWait) throws Exception {
+    protected EntityNotification waitForNotification(final NotificationConsumer<EntityNotification> consumer, int maxWait,
+                                                     final NotificationPredicate predicate) throws Exception {
+        final TypeUtils.Pair<EntityNotification, String> pair = TypeUtils.Pair.of(null, null);
+        final long maxCurrentTime = System.currentTimeMillis() + maxWait;
         waitFor(maxWait, new Predicate() {
             @Override
             public boolean evaluate() throws Exception {
-                return notificationConsumer.getLastEntityNotification() != null;
+                try {
+                    while (consumer.hasNext() && System.currentTimeMillis() < maxCurrentTime) {
+                        EntityNotification notification = consumer.next();
+                        if (predicate.evaluate(notification)) {
+                            pair.left = notification;
+                            return true;
+                        }
+                    }
+                } catch(ConsumerTimeoutException e) {
+                    //ignore
+                }
+                return false;
             }
         });
+        return pair.left;
+    }
+
+    protected NotificationPredicate newNotificationPredicate(final EntityNotification.OperationType operationType,
+                                                             final String typeName, final String guid) {
+        return new NotificationPredicate() {
+            @Override
+            public boolean evaluate(EntityNotification notification) throws Exception {
+                return notification != null &&
+                        notification.getOperationType() == operationType &&
+                        notification.getEntity().getTypeName().equals(typeName) &&
+                        notification.getEntity().getId()._getId().equals(guid);
+            }
+        };
     }
 }
diff --git a/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java b/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java
index a268196..f2599b5 100755
--- a/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java
+++ b/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java
@@ -67,7 +67,6 @@
 import java.util.UUID;
 
 import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.fail;
 
 /**
@@ -89,7 +88,7 @@
 
     @Inject
     private NotificationInterface notificationInterface;
-    private EntityNotificationConsumer notificationConsumer;
+    private NotificationConsumer<EntityNotification> notificationConsumer;
 
     @BeforeClass
     public void setUp() throws Exception {
@@ -100,19 +99,7 @@
         List<NotificationConsumer<EntityNotification>> consumers =
                 notificationInterface.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1);
 
-        NotificationConsumer<EntityNotification> consumer = consumers.iterator().next();
-        notificationConsumer = new EntityNotificationConsumer(consumer);
-        notificationConsumer.start();
-    }
-
-    @AfterClass
-    public void tearDown() {
-        notificationConsumer.stop();
-    }
-
-    @BeforeMethod
-    public void setupTest() {
-        notificationConsumer.reset();
+        notificationConsumer = consumers.iterator().next();
     }
 
     @Test
@@ -158,20 +145,26 @@
 
         serviceClient.createEntity(db).getString(0);
 
-        waitForNotification(notificationConsumer, MAX_WAIT_TIME);
-        EntityNotification notification = notificationConsumer.getLastEntityNotification();
-        assertNotNull(notification);
-        assertEquals(notification.getEntity().get("name"), dbName);
+        waitForNotification(notificationConsumer, MAX_WAIT_TIME, new NotificationPredicate() {
+            @Override
+            public boolean evaluate(EntityNotification notification) throws Exception {
+                return notification != null && notification.getEntity().get("name").equals(dbName);
+            }
+        });
 
         JSONArray results =
                 serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, dbName));
         assertEquals(results.length(), 1);
 
         //create entity again shouldn't create another instance with same unique attribute value
-        notificationConsumer.reset();
         serviceClient.createEntity(db);
         try {
-            waitForNotification(notificationConsumer, MAX_WAIT_TIME);
+            waitForNotification(notificationConsumer, MAX_WAIT_TIME, new NotificationPredicate() {
+                @Override
+                public boolean evaluate(EntityNotification notification) throws Exception {
+                    return notification != null && notification.getEntity().get("name").equals(dbName);
+                }
+            });
             fail("Expected time out exception");
         } catch (Exception e) {
             //expected timeout