ATLAS-1766: updated NotificationConsumer implementation to use new Kafka Consumer API, to enable support for SASL_SSL protocol
Signed-off-by: Madhan Neethiraj <madhan@apache.org>
diff --git a/distro/src/conf/atlas-application.properties b/distro/src/conf/atlas-application.properties
index 5e59528..474f253 100755
--- a/distro/src/conf/atlas-application.properties
+++ b/distro/src/conf/atlas-application.properties
@@ -74,9 +74,13 @@
atlas.kafka.zookeeper.connection.timeout.ms=200
atlas.kafka.zookeeper.sync.time.ms=20
atlas.kafka.auto.commit.interval.ms=1000
-atlas.kafka.auto.offset.reset=smallest
atlas.kafka.hook.group.id=atlas
-atlas.kafka.auto.commit.enable=false
+
+atlas.kafka.enable.auto.commit=false
+atlas.kafka.auto.offset.reset=earliest
+atlas.kafka.session.timeout.ms=30000
+
+
atlas.notification.create.topics=true
atlas.notification.replicas=1
atlas.notification.topics=ATLAS_HOOK,ATLAS_ENTITIES
diff --git a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
new file mode 100644
index 0000000..9c15243
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.kafka;
+
+import org.apache.atlas.notification.AbstractNotificationConsumer;
+import org.apache.atlas.notification.MessageDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+
+/**
+ * Kafka specific notification consumer.
+ *
+ * @param <T> the notification type returned by this consumer
+ */
+public class AtlasKafkaConsumer<T> extends AbstractNotificationConsumer<T> {
+ private static final Logger LOG = LoggerFactory.getLogger(AtlasKafkaConsumer.class);
+
+ private final KafkaConsumer kafkaConsumer;
+ private final boolean autoCommitEnabled;
+
+ public AtlasKafkaConsumer(MessageDeserializer<T> deserializer, KafkaConsumer kafkaConsumer, boolean autoCommitEnabled) {
+ super(deserializer);
+
+ this.kafkaConsumer = kafkaConsumer;
+ this.autoCommitEnabled = autoCommitEnabled;
+ }
+
+ public List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds) {
+ List<AtlasKafkaMessage<T>> messages = new ArrayList();
+
+ ConsumerRecords<?, ?> records = kafkaConsumer.poll(timeoutMilliSeconds);
+
+ if (records != null) {
+ for (ConsumerRecord<?, ?> record : records) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received Message topic ={}, partition ={}, offset = {}, key = {}, value = {}",
+ record.topic(), record.partition(), record.offset(), record.key(), record.value());
+ }
+
+ T message = deserializer.deserialize(record.value().toString());
+
+ messages.add(new AtlasKafkaMessage(message, record.offset(), record.partition()));
+ }
+ }
+
+ return messages;
+ }
+
+
+ @Override
+ public void commit(TopicPartition partition, long offset) {
+ if (!autoCommitEnabled) {
+ if (LOG.isDebugEnabled()) {
+ LOG.info(" commiting the offset ==>> " + offset);
+ }
+ kafkaConsumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(offset)));
+ }
+ }
+
+ @Override
+ public void close() {
+ if (kafkaConsumer != null) {
+ kafkaConsumer.close();
+ }
+ }
+}
diff --git a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java
new file mode 100644
index 0000000..cdbf57f
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java
@@ -0,0 +1,44 @@
+package org.apache.atlas.kafka;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public class AtlasKafkaMessage<T> {
+ private final T message;
+ private final long offset;
+ private final int partition;
+
+ public AtlasKafkaMessage(T message, long offset, int partition) {
+ this.message = message;
+ this.offset = offset;
+ this.partition = partition;
+ }
+
+ public T getMessage() {
+ return message;
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+
+ public int getPartition() {
+ return partition;
+ }
+
+}
diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java
deleted file mode 100644
index 16c0eb2..0000000
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.atlas.kafka;
-
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.message.MessageAndMetadata;
-import org.apache.atlas.notification.AbstractNotificationConsumer;
-import org.apache.atlas.notification.MessageDeserializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Kafka specific notification consumer.
- *
- * @param <T> the notification type returned by this consumer
- */
-public class KafkaConsumer<T> extends AbstractNotificationConsumer<T> {
- private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumer.class);
-
- private final int consumerId;
- private final ConsumerIterator iterator;
- private final ConsumerConnector consumerConnector;
- private final boolean autoCommitEnabled;
- private long lastSeenOffset;
-
-
- // ----- Constructors ----------------------------------------------------
-
- /**
- * Create a Kafka consumer.
- * @param deserializer the message deserializer used for this consumer
- * @param stream the underlying Kafka stream
- * @param consumerId an id value for this consumer
- * @param consumerConnector the {@link ConsumerConnector} which created the underlying Kafka stream
- * @param autoCommitEnabled true if consumer does not need to commit offsets explicitly, false otherwise.
- */
- public KafkaConsumer(MessageDeserializer<T> deserializer, KafkaStream<String, String> stream, int consumerId,
- ConsumerConnector consumerConnector, boolean autoCommitEnabled) {
- super(deserializer);
- this.consumerConnector = consumerConnector;
- this.lastSeenOffset = 0;
- this.iterator = stream.iterator();
- this.consumerId = consumerId;
- this.autoCommitEnabled = autoCommitEnabled;
- }
-
-
- // ----- NotificationConsumer --------------------------------------------
-
- @Override
- public boolean hasNext() {
- return iterator.hasNext();
- }
-
-
- // ----- AbstractNotificationConsumer ------------------------------------
-
- @Override
- public String getNext() {
- MessageAndMetadata message = iterator.next();
- LOG.debug("Read message: conumerId: {}, topic - {}, partition - {}, offset - {}, message - {}",
- consumerId, message.topic(), message.partition(), message.offset(), message.message());
- lastSeenOffset = message.offset();
- return (String) message.message();
- }
-
- @Override
- protected String peekMessage() {
- MessageAndMetadata message = (MessageAndMetadata) iterator.peek();
- return (String) message.message();
- }
-
- @Override
- public void commit() {
- if (autoCommitEnabled) {
- LOG.debug("Auto commit is disabled, not committing.");
- } else {
- consumerConnector.commitOffsets();
- LOG.debug("Committed offset: {}", lastSeenOffset);
- }
- }
-
- @Override
- public void close() {
- consumerConnector.shutdown();
- }
-}
diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
index 8bd31fd..366c8a7 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
@@ -18,25 +18,22 @@
package org.apache.atlas.kafka;
import com.google.common.annotations.VisibleForTesting;
-import kafka.consumer.Consumer;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.serializer.StringDecoder;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.Time;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.notification.AbstractNotification;
-import org.apache.atlas.notification.MessageDeserializer;
import org.apache.atlas.notification.NotificationConsumer;
import org.apache.atlas.notification.NotificationException;
import org.apache.atlas.service.Service;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.commons.lang.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
@@ -56,10 +53,11 @@
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.net.URL;
-import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.HashMap;
+import java.util.ArrayList;
import java.util.Properties;
import java.util.concurrent.Future;
@@ -83,9 +81,8 @@
private KafkaServer kafkaServer;
private ServerCnxnFactory factory;
private Properties properties;
-
+ private KafkaConsumer consumer = null;
private KafkaProducer producer = null;
- private List<ConsumerConnector> consumerConnectors = new ArrayList<>();
private static final Map<NotificationType, String> TOPIC_MAP = new HashMap<NotificationType, String>() {
{
@@ -126,8 +123,7 @@
"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
- properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "roundrobin");
- properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "smallest");
+ properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
}
@VisibleForTesting
@@ -171,34 +167,18 @@
public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType,
int numConsumers) {
return createConsumers(notificationType, numConsumers,
- Boolean.valueOf(properties.getProperty("auto.commit.enable", "true")));
+ Boolean.valueOf(properties.getProperty("enable.auto.commit", "true")));
}
@VisibleForTesting
public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType,
int numConsumers, boolean autoCommitEnabled) {
- String topic = TOPIC_MAP.get(notificationType);
Properties consumerProperties = getConsumerProperties(notificationType);
- List<NotificationConsumer<T>> consumers = new ArrayList<>(numConsumers);
- for (int i = 0; i < numConsumers; i++) {
- ConsumerConnector consumerConnector = createConsumerConnector(consumerProperties);
- Map<String, Integer> topicCountMap = new HashMap<>();
- topicCountMap.put(topic, 1);
- StringDecoder decoder = new StringDecoder(null);
- Map<String, List<KafkaStream<String, String>>> streamsMap =
- consumerConnector.createMessageStreams(topicCountMap, decoder, decoder);
- List<KafkaStream<String, String>> kafkaConsumers = streamsMap.get(topic);
- for (KafkaStream stream : kafkaConsumers) {
- KafkaConsumer<T> kafkaConsumer =
- createKafkaConsumer(notificationType.getClassType(), notificationType.getDeserializer(),
- stream, i, consumerConnector, autoCommitEnabled);
- consumers.add(kafkaConsumer);
- }
- consumerConnectors.add(consumerConnector);
- }
-
+ List<NotificationConsumer<T>> consumers = new ArrayList<>();
+ AtlasKafkaConsumer kafkaConsumer = new AtlasKafkaConsumer(notificationType.getDeserializer(), getKafkaConsumer(consumerProperties,notificationType, autoCommitEnabled), autoCommitEnabled);
+ consumers.add(kafkaConsumer);
return consumers;
}
@@ -208,11 +188,6 @@
producer.close();
producer = null;
}
-
- for (ConsumerConnector consumerConnector : consumerConnectors) {
- consumerConnector.shutdown();
- }
- consumerConnectors.clear();
}
@@ -254,43 +229,31 @@
}
}
- // ----- helper methods --------------------------------------------------
- /**
- * Create a Kafka consumer connector from the given properties.
- *
- * @param consumerProperties the properties for creating the consumer connector
- *
- * @return a new Kafka consumer connector
- */
- protected ConsumerConnector createConsumerConnector(Properties consumerProperties) {
- return Consumer.createJavaConsumerConnector(new kafka.consumer.ConsumerConfig(consumerProperties));
+ public KafkaConsumer getKafkaConsumer(Properties consumerProperties, NotificationType type, boolean autoCommitEnabled) {
+ if(this.consumer == null) {
+ try {
+ String topic = TOPIC_MAP.get(type);
+ consumerProperties.put("enable.auto.commit", autoCommitEnabled);
+ this.consumer = new KafkaConsumer(consumerProperties);
+ this.consumer.subscribe(Arrays.asList(topic));
+ }catch (Exception ee) {
+ LOG.error("Exception in getKafkaConsumer ", ee);
+ }
+ }
+
+ return this.consumer;
}
- /**
- * Create a Kafka consumer from the given Kafka stream.
- *
- * @param type the notification type to be returned by the consumer
- * @param deserializer the deserializer for the created consumers
- * @param stream the Kafka stream
- * @param consumerId the id for the new consumer
- *
- * @param consumerConnector
- * @return a new Kafka consumer
- */
- protected <T> org.apache.atlas.kafka.KafkaConsumer<T>
- createKafkaConsumer(Class<T> type, MessageDeserializer<T> deserializer, KafkaStream stream,
- int consumerId, ConsumerConnector consumerConnector, boolean autoCommitEnabled) {
- return new org.apache.atlas.kafka.KafkaConsumer<>(deserializer, stream,
- consumerId, consumerConnector, autoCommitEnabled);
- }
+
+
// Get properties for consumer request
private Properties getConsumerProperties(NotificationType type) {
// find the configured group id for the given notification type
- String groupId = properties.getProperty(type.toString().toLowerCase() + "." + CONSUMER_GROUP_ID_PROPERTY);
- if (groupId == null) {
+ String groupId = properties.getProperty(type.toString().toLowerCase() + "." + CONSUMER_GROUP_ID_PROPERTY);
+ if (StringUtils.isEmpty(groupId)) {
throw new IllegalStateException("No configuration group id set for the notification type " + type);
}
@@ -298,7 +261,7 @@
consumerProperties.putAll(properties);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
- LOG.info("Consumer property: auto.commit.enable: {}", consumerProperties.getProperty("auto.commit.enable"));
+ LOG.info("Consumer property: atlas.kafka.enable.auto.commit: {}", consumerProperties.getProperty("enable.auto.commit"));
return consumerProperties;
}
diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/AbstractMessageDeserializer.java
index 9585827..ec99372 100644
--- a/notification/src/main/java/org/apache/atlas/notification/AbstractMessageDeserializer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/AbstractMessageDeserializer.java
@@ -128,7 +128,7 @@
/**
* Deserializer for JSONArray.
*/
- protected static final class JSONArrayDeserializer implements JsonDeserializer<JSONArray> {
+ public static final class JSONArrayDeserializer implements JsonDeserializer<JSONArray> {
@Override
public JSONArray deserialize(final JsonElement json, final Type type,
final JsonDeserializationContext context) {
diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java b/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java
index d4d78de..8cf1e8e 100644
--- a/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java
@@ -16,6 +16,7 @@
* limitations under the License.
*/
package org.apache.atlas.notification;
+import org.apache.kafka.common.TopicPartition;
/**
* Abstract notification consumer.
@@ -25,10 +26,9 @@
/**
* Deserializer used to deserialize notification messages for this consumer.
*/
- private final MessageDeserializer<T> deserializer;
+ protected final MessageDeserializer<T> deserializer;
- // ----- Constructors ----------------------------------------------------
/**
* Construct an AbstractNotificationConsumer.
@@ -40,34 +40,6 @@
}
- // ----- AbstractNotificationConsumer -------------------------------------
- /**
- * Get the next notification as a string.
- *
- * @return the next notification in string form
- */
- protected abstract String getNext();
-
- /**
- * Get the next notification as a string without advancing.
- *
- * @return the next notification in string form
- */
- protected abstract String peekMessage();
-
-
- // ----- NotificationConsumer ---------------------------------------------
-
- @Override
- public T next() {
- return deserializer.deserialize(getNext());
- }
-
- @Override
- public T peek() {
- return deserializer.deserialize(peekMessage());
- }
-
- public abstract void commit();
+ public abstract void commit(TopicPartition partition, long offset);
}
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
index a99cb10..22e40f9 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
@@ -17,32 +17,16 @@
*/
package org.apache.atlas.notification;
+import java.util.List;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.atlas.kafka.AtlasKafkaMessage;
+
/**
* Atlas notification consumer. This consumer blocks until a notification can be read.
*
* @param <T> the class type of notifications returned by this consumer
*/
public interface NotificationConsumer<T> {
- /**
- * Returns true when the consumer has more notifications. Blocks until a notification becomes available.
- *
- * @return true when the consumer has notifications to be read
- */
- boolean hasNext();
-
- /**
- * Returns the next notification.
- *
- * @return the next notification
- */
- T next();
-
- /**
- * Returns the next notification without advancing.
- *
- * @return the next notification
- */
- T peek();
/**
* Commit the offset of messages that have been successfully processed.
@@ -51,7 +35,14 @@
* the consumer is ready to handle the next message, which could happen even after a normal or an abnormal
* restart.
*/
- void commit();
+ void commit(TopicPartition partition, long offset);
void close();
+
+ /**
+ * Fetch data for the topics from Kafka
+ * @param timeoutMilliSeconds poll timeout
+ * @return List containing kafka message and partionId and offset.
+ */
+ List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds);
}
diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
index ad7d93e..70059cb 100644
--- a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
+++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
@@ -18,13 +18,9 @@
package org.apache.atlas.kafka;
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.apache.atlas.notification.AbstractNotification;
import org.apache.atlas.notification.MessageVersion;
-import org.apache.atlas.notification.NotificationConsumer;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.notification.IncompatibleVersionException;
import org.apache.atlas.notification.VersionedMessage;
@@ -33,6 +29,11 @@
import org.apache.atlas.typesystem.IStruct;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.Struct;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
import org.codehaus.jettison.json.JSONException;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
@@ -42,7 +43,10 @@
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
-import java.util.NoSuchElementException;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@@ -57,8 +61,10 @@
private static final String TRAIT_NAME = "MyTrait";
+
@Mock
- private ConsumerConnector consumerConnector;
+ private KafkaConsumer kafkaConsumer;
+
@BeforeMethod
public void setup() {
@@ -66,9 +72,9 @@
}
@Test
- public void testNext() throws Exception {
- KafkaStream<String, String> stream = mock(KafkaStream.class);
- ConsumerIterator<String, String> iterator = mock(ConsumerIterator.class);
+ public void testReceive() throws Exception {
+
+
MessageAndMetadata<String, String> messageAndMetadata = mock(MessageAndMetadata.class);
Referenceable entity = getEntity(TRAIT_NAME);
@@ -78,29 +84,34 @@
String json = AbstractNotification.GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), message));
- when(stream.iterator()).thenReturn(iterator);
- when(iterator.hasNext()).thenReturn(true).thenReturn(false);
- when(iterator.next()).thenReturn(messageAndMetadata).thenThrow(new NoSuchElementException());
+ kafkaConsumer.assign(Arrays.asList(new TopicPartition("ATLAS_HOOK", 0)));
+ List<ConsumerRecord> klist = new ArrayList<>();
+ klist.add(new ConsumerRecord<String, String>("ATLAS_HOOK",
+ 0, 0L, "mykey", json));
+
+ TopicPartition tp = new TopicPartition("ATLAS_HOOK",0);
+ Map mp = new HashMap();
+ mp.put(tp,klist);
+ ConsumerRecords records = new ConsumerRecords(mp);
+
+
+ when(kafkaConsumer.poll(1000)).thenReturn(records);
when(messageAndMetadata.message()).thenReturn(json);
- NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
- new KafkaConsumer<>(
- NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99,
- consumerConnector, false);
- assertTrue(consumer.hasNext());
+ AtlasKafkaConsumer consumer = new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer,false);
+ List<AtlasKafkaMessage<HookNotification.HookNotificationMessage>> messageList = consumer.receive(1000);
+ assertTrue(messageList.size() > 0);
- HookNotification.HookNotificationMessage consumedMessage = consumer.next();
+ HookNotification.HookNotificationMessage consumedMessage = messageList.get(0).getMessage();
assertMessagesEqual(message, consumedMessage, entity);
- assertFalse(consumer.hasNext());
}
@Test
public void testNextVersionMismatch() throws Exception {
- KafkaStream<String, String> stream = mock(KafkaStream.class);
- ConsumerIterator<String, String> iterator = mock(ConsumerIterator.class);
+
MessageAndMetadata<String, String> messageAndMetadata = mock(MessageAndMetadata.class);
Referenceable entity = getEntity(TRAIT_NAME);
@@ -110,84 +121,56 @@
String json = AbstractNotification.GSON.toJson(new VersionedMessage<>(new MessageVersion("2.0.0"), message));
- when(stream.iterator()).thenReturn(iterator);
- when(iterator.hasNext()).thenReturn(true).thenReturn(false);
- when(iterator.next()).thenReturn(messageAndMetadata).thenThrow(new NoSuchElementException());
+ kafkaConsumer.assign(Arrays.asList(new TopicPartition("ATLAS_HOOK", 0)));
+ List<ConsumerRecord> klist = new ArrayList<>();
+ klist.add(new ConsumerRecord<String, String>("ATLAS_HOOK",
+ 0, 0L, "mykey", json));
+
+ TopicPartition tp = new TopicPartition("ATLAS_HOOK",0);
+ Map mp = new HashMap();
+ mp.put(tp,klist);
+ ConsumerRecords records = new ConsumerRecords(mp);
+
+ when(kafkaConsumer.poll(1000)).thenReturn(records);
when(messageAndMetadata.message()).thenReturn(json);
- NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
- new KafkaConsumer<>(
- NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99,
- consumerConnector, false);
-
- assertTrue(consumer.hasNext());
-
+ AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer ,false);
try {
- consumer.next();
+ List<AtlasKafkaMessage<HookNotification.HookNotificationMessage>> messageList = consumer.receive(1000);
+ assertTrue(messageList.size() > 0);
+
+ HookNotification.HookNotificationMessage consumedMessage = messageList.get(0).getMessage();
+
fail("Expected VersionMismatchException!");
} catch (IncompatibleVersionException e) {
e.printStackTrace();
}
- assertFalse(consumer.hasNext());
- }
+ }
- @Test
- public void testPeekMessage() throws Exception {
- KafkaStream<String, String> stream = mock(KafkaStream.class);
- ConsumerIterator<String, String> iterator = mock(ConsumerIterator.class);
- MessageAndMetadata<String, String> messageAndMetadata = mock(MessageAndMetadata.class);
-
- Referenceable entity = getEntity(TRAIT_NAME);
-
- HookNotification.EntityUpdateRequest message =
- new HookNotification.EntityUpdateRequest("user1", entity);
-
- String json = AbstractNotification.GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), message));
-
- when(stream.iterator()).thenReturn(iterator);
- when(iterator.hasNext()).thenReturn(true);
- when(iterator.peek()).thenReturn(messageAndMetadata);
- when(messageAndMetadata.message()).thenReturn(json);
-
- NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
- new KafkaConsumer<>(
- NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99,
- consumerConnector, false);
-
- assertTrue(consumer.hasNext());
-
- HookNotification.HookNotificationMessage consumedMessage = consumer.peek();
-
- assertMessagesEqual(message, consumedMessage, entity);
-
- assertTrue(consumer.hasNext());
- }
@Test
public void testCommitIsCalledIfAutoCommitDisabled() {
- KafkaStream<String, String> stream = mock(KafkaStream.class);
- NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
- new KafkaConsumer<>(
- NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99,
- consumerConnector, false);
- consumer.commit();
+ TopicPartition tp = new TopicPartition("ATLAS_HOOK",0);
- verify(consumerConnector).commitOffsets();
+ AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer, false);
+
+ consumer.commit(tp, 1);
+
+ verify(kafkaConsumer).commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(1)));
}
@Test
public void testCommitIsNotCalledIfAutoCommitEnabled() {
- KafkaStream<String, String> stream = mock(KafkaStream.class);
- NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
- new KafkaConsumer<>(
- NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99,
- consumerConnector, true);
- consumer.commit();
+ TopicPartition tp = new TopicPartition("ATLAS_HOOK",0);
- verify(consumerConnector, never()).commitOffsets();
+ AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer, true);
+
+ consumer.commit(tp, 1);
+
+ verify(kafkaConsumer, never()).commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(1)));
}
private Referenceable getEntity(String traitName) {
diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
index 2126be6..b7474a0 100644
--- a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
+++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
@@ -24,12 +24,13 @@
import org.apache.atlas.notification.NotificationConsumer;
import org.apache.atlas.notification.NotificationException;
import org.apache.atlas.notification.NotificationInterface;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.testng.annotations.Test;
-
+import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -37,7 +38,7 @@
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
-
+import org.apache.atlas.kafka.AtlasKafkaConsumer;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
@@ -55,36 +56,24 @@
public void testCreateConsumers() throws Exception {
Properties properties = mock(Properties.class);
when(properties.getProperty("entities.group.id")).thenReturn("atlas");
- final ConsumerConnector consumerConnector = mock(ConsumerConnector.class);
Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put(KafkaNotification.ATLAS_ENTITIES_TOPIC, 1);
- Map<String, List<KafkaStream<String, String>>> kafkaStreamsMap =
- new HashMap<>();
- List<KafkaStream<String, String>> kafkaStreams = new ArrayList<>();
- KafkaStream kafkaStream = mock(KafkaStream.class);
- kafkaStreams.add(kafkaStream);
- kafkaStreamsMap.put(KafkaNotification.ATLAS_ENTITIES_TOPIC, kafkaStreams);
-
- when(consumerConnector.createMessageStreams(
- eq(topicCountMap), any(StringDecoder.class), any(StringDecoder.class))).thenReturn(kafkaStreamsMap);
-
- final KafkaConsumer consumer1 = mock(KafkaConsumer.class);
- final KafkaConsumer consumer2 = mock(KafkaConsumer.class);
+ final AtlasKafkaConsumer consumer1 = mock(AtlasKafkaConsumer.class);
+ final AtlasKafkaConsumer consumer2 = mock(AtlasKafkaConsumer.class);
KafkaNotification kafkaNotification =
- new TestKafkaNotification(properties, consumerConnector, consumer1, consumer2);
+ new TestKafkaNotification(properties, consumer1, consumer2);
- List<NotificationConsumer<String>> consumers =
+ List<NotificationConsumer<AtlasKafkaConsumer>> consumers =
kafkaNotification.createConsumers(NotificationInterface.NotificationType.ENTITIES, 2);
- verify(consumerConnector, times(2)).createMessageStreams(
- eq(topicCountMap), any(StringDecoder.class), any(StringDecoder.class));
assertEquals(consumers.size(), 2);
assertTrue(consumers.contains(consumer1));
assertTrue(consumers.contains(consumer2));
}
+
@Test
@SuppressWarnings("unchecked")
public void shouldSendMessagesSuccessfully() throws NotificationException,
@@ -164,27 +153,28 @@
class TestKafkaNotification extends KafkaNotification {
- private final ConsumerConnector consumerConnector;
- private final KafkaConsumer consumer1;
- private final KafkaConsumer consumer2;
+ private final AtlasKafkaConsumer consumer1;
+ private final AtlasKafkaConsumer consumer2;
- TestKafkaNotification(Properties properties, ConsumerConnector consumerConnector,
- KafkaConsumer consumer1, KafkaConsumer consumer2) {
+ TestKafkaNotification(Properties properties,
+ AtlasKafkaConsumer consumer1, AtlasKafkaConsumer consumer2) {
super(properties);
- this.consumerConnector = consumerConnector;
this.consumer1 = consumer1;
this.consumer2 = consumer2;
}
- @Override
- protected ConsumerConnector createConsumerConnector(Properties consumerProperties) {
- return consumerConnector;
- }
@Override
- protected <T> org.apache.atlas.kafka.KafkaConsumer<T>
- createKafkaConsumer(Class<T> type, MessageDeserializer<T> deserializer, KafkaStream stream,
- int consumerId, ConsumerConnector connector, boolean autoCommitEnabled) {
+ public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType,
+ int numConsumers) {
+ List consumerList = new ArrayList<NotificationConsumer>();
+ consumerList.add(consumer1);
+ consumerList.add(consumer2);
+ return consumerList;
+ }
+
+ protected <T> AtlasKafkaConsumer<T>
+ createConsumers(Class<T> type, int consumerId, boolean autoCommitEnabled) {
if (consumerId == 0) {
return consumer1;
} else if (consumerId == 1) {
diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
index a810029..c791d43 100644
--- a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
+++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
@@ -28,6 +28,9 @@
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import static org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
+
+import java.util.List;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
@@ -52,7 +55,7 @@
}
@Test
- public void testNext() throws Exception {
+ public void testReceiveKafkaMessages() throws Exception {
kafkaNotification.send(NotificationInterface.NotificationType.HOOK,
new HookNotification.EntityCreateRequest("u1", new Referenceable("type")));
kafkaNotification.send(NotificationInterface.NotificationType.HOOK,
@@ -64,44 +67,21 @@
NotificationConsumer<Object> consumer =
kafkaNotification.createConsumers(NotificationInterface.NotificationType.HOOK, 1).get(0);
- assertTrue(consumer.hasNext());
- HookNotification.HookNotificationMessage message = (HookNotification.HookNotificationMessage) consumer.next();
- assertEquals(message.getUser(), "u1");
+ List<AtlasKafkaMessage<Object>> messages = null ;
+ long startTime = System.currentTimeMillis(); //fetch starting time
+ while ((System.currentTimeMillis() - startTime) < 10000) {
+ messages = consumer.receive(1000L);
+ if (messages.size() > 0) {
+ break;
+ }
+ }
- assertTrue(consumer.hasNext());
- message = (HookNotification.HookNotificationMessage) consumer.next();
- assertEquals(message.getUser(), "u2");
- consumer.close();
+ int i=1;
+ for (AtlasKafkaMessage<Object> msg : messages){
+ HookNotification.HookNotificationMessage message = (HookNotificationMessage) msg.getMessage();
+ assertEquals(message.getUser(), "u"+i++);
+ }
- //nothing committed(even though u1 and u2 are read), now should restart from u1
- consumer = kafkaNotification.createConsumers(NotificationInterface.NotificationType.HOOK, 1).get(0);
- assertTrue(consumer.hasNext());
- message = (HookNotification.HookNotificationMessage) consumer.next();
- assertEquals(message.getUser(), "u1");
- consumer.commit();
-
- assertTrue(consumer.hasNext());
- message = (HookNotification.HookNotificationMessage) consumer.next();
- assertEquals(message.getUser(), "u2");
- consumer.close();
-
- //u1 committed, u2 read, should start from u2
- consumer = kafkaNotification.createConsumers(NotificationInterface.NotificationType.HOOK, 1).get(0);
- assertTrue(consumer.hasNext());
- message = (HookNotification.HookNotificationMessage) consumer.next();
- assertEquals(message.getUser(), "u2");
-
- assertTrue(consumer.hasNext());
- message = (HookNotification.HookNotificationMessage) consumer.next();
- assertEquals(message.getUser(), "u3");
- consumer.commit();
- consumer.close();
-
- //u2, u3 read, but only u3 committed, should start from u4
- consumer = kafkaNotification.createConsumers(NotificationInterface.NotificationType.HOOK, 1).get(0);
- assertTrue(consumer.hasNext());
- message = (HookNotification.HookNotificationMessage) consumer.next();
- assertEquals(message.getUser(), "u4");
consumer.close();
}
}
diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
index 13f2f0b..8324b57 100644
--- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
@@ -20,10 +20,12 @@
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
+import org.apache.atlas.kafka.AtlasKafkaMessage;
import org.slf4j.Logger;
import org.testng.annotations.Test;
import java.lang.reflect.Type;
+import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
@@ -35,6 +37,7 @@
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
+import org.apache.kafka.common.TopicPartition;
/**
* AbstractNotificationConsumer tests.
@@ -44,7 +47,7 @@
private static final Gson GSON = new Gson();
@Test
- public void testNext() throws Exception {
+ public void testReceive() throws Exception {
Logger logger = mock(Logger.class);
TestMessage testMessage1 = new TestMessage("sValue1", 99);
@@ -52,7 +55,7 @@
TestMessage testMessage3 = new TestMessage("sValue3", 97);
TestMessage testMessage4 = new TestMessage("sValue4", 96);
- List<String> jsonList = new LinkedList<>();
+ List jsonList = new LinkedList<>();
jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage1)));
jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage2)));
@@ -62,25 +65,19 @@
Type versionedMessageType = new TypeToken<VersionedMessage<TestMessage>>(){}.getType();
NotificationConsumer<TestMessage> consumer =
- new TestNotificationConsumer<>(versionedMessageType, jsonList, logger);
+ new TestNotificationConsumer<>(versionedMessageType, jsonList, logger);
- assertTrue(consumer.hasNext());
+ List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive(1000L);
- assertEquals(testMessage1, consumer.next());
+ assertFalse(messageList.isEmpty());
- assertTrue(consumer.hasNext());
+ assertEquals(testMessage1, messageList.get(0).getMessage());
- assertEquals(testMessage2, consumer.next());
+ assertEquals(testMessage2, messageList.get(1).getMessage());
- assertTrue(consumer.hasNext());
+ assertEquals(testMessage3, messageList.get(2).getMessage());
- assertEquals(testMessage3, consumer.next());
-
- assertTrue(consumer.hasNext());
-
- assertEquals(testMessage4, consumer.next());
-
- assertFalse(consumer.hasNext());
+ assertEquals(testMessage4, messageList.get(3).getMessage());
}
@Test
@@ -92,7 +89,7 @@
TestMessage testMessage3 = new TestMessage("sValue3", 97);
TestMessage testMessage4 = new TestMessage("sValue4", 96);
- List<String> jsonList = new LinkedList<>();
+ List jsonList = new LinkedList<>();
String json1 = GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage1));
String json2 = GSON.toJson(new VersionedMessage<>(new MessageVersion("0.0.5"), testMessage2));
@@ -108,26 +105,17 @@
NotificationConsumer<TestMessage> consumer =
new TestNotificationConsumer<>(versionedMessageType, jsonList, logger);
- assertTrue(consumer.hasNext());
- assertEquals(new TestMessage("sValue1", 99), consumer.next());
+ List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive(1000L);
- assertTrue(consumer.hasNext());
+ assertEquals(new TestMessage("sValue1", 99), messageList.get(0).getMessage());
- assertEquals(new TestMessage("sValue2", 98), consumer.next());
- verify(logger).info(endsWith(json2));
+ assertEquals(new TestMessage("sValue2", 98), messageList.get(1).getMessage());
- assertTrue(consumer.hasNext());
+ assertEquals(new TestMessage("sValue3", 97), messageList.get(2).getMessage());
- assertEquals(new TestMessage("sValue3", 97), consumer.next());
- verify(logger).info(endsWith(json3));
+ assertEquals(new TestMessage("sValue4", 96), messageList.get(3).getMessage());
- assertTrue(consumer.hasNext());
-
- assertEquals(new TestMessage("sValue4", 96), consumer.next());
- verify(logger).info(endsWith(json4));
-
- assertFalse(consumer.hasNext());
}
@Test
@@ -137,7 +125,7 @@
TestMessage testMessage1 = new TestMessage("sValue1", 99);
TestMessage testMessage2 = new TestMessage("sValue2", 98);
- List<String> jsonList = new LinkedList<>();
+ List jsonList = new LinkedList<>();
String json1 = GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage1));
String json2 = GSON.toJson(new VersionedMessage<>(new MessageVersion("2.0.0"), testMessage2));
@@ -149,52 +137,19 @@
NotificationConsumer<TestMessage> consumer =
new TestNotificationConsumer<>(versionedMessageType, jsonList, logger);
- assertTrue(consumer.hasNext());
-
- assertEquals(testMessage1, consumer.next());
-
- assertTrue(consumer.hasNext());
-
try {
- consumer.next();
+ List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive(1000L);
+
+ messageList.get(1).getMessage();
+
fail("Expected VersionMismatchException!");
} catch (IncompatibleVersionException e) {
- verify(logger).error(endsWith(json2));
+
}
- assertFalse(consumer.hasNext());
}
- @Test
- public void testPeek() throws Exception {
- Logger logger = mock(Logger.class);
- TestMessage testMessage1 = new TestMessage("sValue1", 99);
- TestMessage testMessage2 = new TestMessage("sValue2", 98);
- TestMessage testMessage3 = new TestMessage("sValue3", 97);
- TestMessage testMessage4 = new TestMessage("sValue4", 96);
-
- List<String> jsonList = new LinkedList<>();
-
- jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage1)));
- jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage2)));
- jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage3)));
- jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage4)));
-
- Type versionedMessageType = new TypeToken<VersionedMessage<TestMessage>>(){}.getType();
-
- NotificationConsumer<TestMessage> consumer =
- new TestNotificationConsumer<>(versionedMessageType, jsonList, logger);
- assertTrue(consumer.hasNext());
-
- assertEquals(testMessage1, consumer.peek());
-
- assertTrue(consumer.hasNext());
-
- assertEquals(testMessage1, consumer.peek());
-
- assertTrue(consumer.hasNext());
- }
private static class TestMessage {
private String s;
@@ -229,31 +184,16 @@
}
private static class TestNotificationConsumer<T> extends AbstractNotificationConsumer<T> {
- private final List<String> messageList;
+ private final List<T> messageList;
private int index = 0;
- public TestNotificationConsumer(Type versionedMessageType, List<String> messages, Logger logger) {
+ public TestNotificationConsumer(Type versionedMessageType, List<T> messages, Logger logger) {
super(new TestDeserializer<T>(versionedMessageType, logger));
this.messageList = messages;
}
@Override
- protected String getNext() {
- return messageList.get(index++);
- }
-
- @Override
- protected String peekMessage() {
- return messageList.get(index);
- }
-
- @Override
- public boolean hasNext() {
- return index < messageList.size();
- }
-
- @Override
- public void commit() {
+ public void commit(TopicPartition partition, long offset) {
// do nothing.
}
@@ -261,6 +201,15 @@
public void close() {
//do nothing
}
+
+ @Override
+ public List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds) {
+ List<AtlasKafkaMessage<T>> tempMessageList = new ArrayList();
+ for(Object json : messageList) {
+ tempMessageList.add(new AtlasKafkaMessage(deserializer.deserialize((String)json), -1, -1));
+ }
+ return tempMessageList;
+ }
}
private static final class TestDeserializer<T> extends VersionedMessageDeserializer<T> {
diff --git a/typesystem/src/test/resources/atlas-application.properties b/typesystem/src/test/resources/atlas-application.properties
index c4ce5ea..7967b76 100644
--- a/typesystem/src/test/resources/atlas-application.properties
+++ b/typesystem/src/test/resources/atlas-application.properties
@@ -91,7 +91,13 @@
atlas.kafka.auto.commit.interval.ms=100
atlas.kafka.hook.group.id=atlas
atlas.kafka.entities.group.id=atlas_entities
-atlas.kafka.auto.commit.enable=false
+#atlas.kafka.auto.commit.enable=false
+
+atlas.kafka.enable.auto.commit=false
+atlas.kafka.auto.offset.reset=earliest
+atlas.kafka.session.timeout.ms=30000
+
+
######### Entity Audit Configs #########
atlas.audit.hbase.tablename=ATLAS_ENTITY_AUDIT_EVENTS
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 2f8245d..9e5b864 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -19,16 +19,15 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import kafka.consumer.ConsumerTimeoutException;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.RequestContext;
import org.apache.atlas.RequestContextV1;
import org.apache.atlas.ha.HAConfiguration;
+import org.apache.atlas.kafka.AtlasKafkaMessage;
import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.model.instance.AtlasEntity;
-import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.notification.hook.HookNotification.EntityPartialUpdateRequest;
import org.apache.atlas.repository.converters.AtlasInstanceConverter;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
@@ -46,7 +45,7 @@
import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
-
+import org.apache.kafka.common.TopicPartition;
import javax.inject.Inject;
import java.util.ArrayList;
import java.util.Date;
@@ -135,14 +134,14 @@
private void startConsumers(ExecutorService executorService) {
int numThreads = applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 1);
- List<NotificationConsumer<HookNotification.HookNotificationMessage>> notificationConsumers =
+ List<NotificationConsumer<HookNotificationMessage>> notificationConsumers =
notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, numThreads);
if (executorService == null) {
executorService = Executors.newFixedThreadPool(notificationConsumers.size(),
new ThreadFactoryBuilder().setNameFormat(THREADNAME_PREFIX + " thread-%d").build());
}
executors = executorService;
- for (final NotificationConsumer<HookNotification.HookNotificationMessage> consumer : notificationConsumers) {
+ for (final NotificationConsumer<HookNotificationMessage> consumer : notificationConsumers) {
HookConsumer hookConsumer = new HookConsumer(consumer);
consumers.add(hookConsumer);
executors.submit(hookConsumer);
@@ -207,21 +206,14 @@
}
class HookConsumer implements Runnable {
- private final NotificationConsumer<HookNotification.HookNotificationMessage> consumer;
+ private final NotificationConsumer<HookNotificationMessage> consumer;
private final AtomicBoolean shouldRun = new AtomicBoolean(false);
- private List<HookNotification.HookNotificationMessage> failedMessages = new ArrayList<>();
+ private List<HookNotificationMessage> failedMessages = new ArrayList<>();
- public HookConsumer(NotificationConsumer<HookNotification.HookNotificationMessage> consumer) {
+ public HookConsumer(NotificationConsumer<HookNotificationMessage> consumer) {
this.consumer = consumer;
}
- private boolean hasNext() {
- try {
- return consumer.hasNext();
- } catch (ConsumerTimeoutException e) {
- return false;
- }
- }
@Override
public void run() {
@@ -233,8 +225,9 @@
while (shouldRun.get()) {
try {
- if (hasNext()) {
- handleMessage(consumer.next());
+ List<AtlasKafkaMessage<HookNotificationMessage>> messages = consumer.receive(1000L);
+ for (AtlasKafkaMessage<HookNotificationMessage> msg : messages){
+ handleMessage(msg);
}
} catch (Throwable t) {
LOG.warn("Failure in NotificationHookConsumer", t);
@@ -243,7 +236,8 @@
}
@VisibleForTesting
- void handleMessage(HookNotificationMessage message) throws AtlasServiceException, AtlasException {
+ void handleMessage(AtlasKafkaMessage<HookNotificationMessage> kafkaMsg) throws AtlasServiceException, AtlasException {
+ HookNotificationMessage message = kafkaMsg.getMessage();
String messageUser = message.getUser();
// Used for intermediate conversions during create and update
AtlasEntity.AtlasEntitiesWithExtInfo entities;
@@ -345,7 +339,7 @@
RequestContextV1.clear();
}
}
- commit();
+ commit(kafkaMsg);
}
private void recordFailedMessages() {
@@ -356,9 +350,10 @@
failedMessages.clear();
}
- private void commit() {
+ private void commit(AtlasKafkaMessage<HookNotificationMessage> kafkaMessage) {
recordFailedMessages();
- consumer.commit();
+ TopicPartition partition = new TopicPartition("ATLAS_HOOK", kafkaMessage.getPartition());
+ consumer.commit(partition, kafkaMessage.getOffset());
}
boolean serverAvailable(Timer timer) {
diff --git a/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java b/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java
index ac3b538..7e94330 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java
@@ -34,8 +34,6 @@
import org.apache.atlas.typesystem.types.utils.TypesUtil;
import org.apache.atlas.web.integration.BaseResourceIT;
import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
@@ -55,7 +53,7 @@
private Id tableId;
private Id dbId;
private String traitName;
- private NotificationConsumer<EntityNotification> notificationConsumer;
+ private NotificationConsumer notificationConsumer;
@BeforeClass
public void setUp() throws Exception {
@@ -64,13 +62,9 @@
Referenceable HiveDBInstance = createHiveDBInstanceBuiltIn(DATABASE_NAME);
dbId = createInstance(HiveDBInstance);
- List<NotificationConsumer<EntityNotification>> consumers =
- notificationInterface.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1);
-
- notificationConsumer = consumers.iterator().next();
+ notificationConsumer = notificationInterface.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1).get(0);
}
- @Test
public void testCreateEntity() throws Exception {
Referenceable tableInstance = createHiveTableInstanceBuiltIn(DATABASE_NAME, TABLE_NAME, dbId);
tableId = createInstance(tableInstance);
@@ -81,7 +75,6 @@
newNotificationPredicate(EntityNotification.OperationType.ENTITY_CREATE, HIVE_TABLE_TYPE_BUILTIN, guid));
}
- @Test(dependsOnMethods = "testCreateEntity")
public void testUpdateEntity() throws Exception {
final String property = "description";
final String newValue = "New description!";
@@ -94,7 +87,6 @@
newNotificationPredicate(EntityNotification.OperationType.ENTITY_UPDATE, HIVE_TABLE_TYPE_BUILTIN, guid));
}
- @Test
public void testDeleteEntity() throws Exception {
final String tableName = "table-" + randomString();
final String dbName = "db-" + randomString();
@@ -116,7 +108,6 @@
newNotificationPredicate(EntityNotification.OperationType.ENTITY_DELETE, HIVE_TABLE_TYPE_BUILTIN, guid));
}
- @Test(dependsOnMethods = "testCreateEntity")
public void testAddTrait() throws Exception {
String superSuperTraitName = "SuperTrait" + randomString();
createTrait(superSuperTraitName);
@@ -175,7 +166,6 @@
assertEquals(2, Collections.frequency(allTraitNames, superTraitName));
}
- @Test(dependsOnMethods = "testAddTrait")
public void testDeleteTrait() throws Exception {
final String guid = tableId._getId();
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
index 18fd2ee..650ca0a 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
@@ -22,6 +22,7 @@
import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.kafka.AtlasKafkaMessage;
import org.apache.atlas.kafka.KafkaNotification;
import org.apache.atlas.kafka.NotificationProvider;
import org.apache.atlas.model.instance.AtlasEntity;
@@ -40,12 +41,21 @@
import org.testng.annotations.AfterTest;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
+import static org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
+import java.util.List;
+import org.apache.atlas.kafka.AtlasKafkaConsumer;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.*;
+import org.apache.commons.configuration.Configuration;
+import org.apache.atlas.ApplicationProperties;
+import static org.testng.Assert.*;
+
+
+
public class NotificationHookConsumerKafkaTest {
public static final String NAME = "name";
@@ -80,6 +90,7 @@
@AfterTest
public void shutdown() {
+ kafkaNotification.close();
kafkaNotification.stop();
}
@@ -87,21 +98,19 @@
public void testConsumerConsumesNewMessageWithAutoCommitDisabled() throws AtlasException, InterruptedException, AtlasBaseException {
try {
produceMessage(new HookNotification.EntityCreateRequest("test_user1", createEntity()));
-
- NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
- createNewConsumer(kafkaNotification, false);
+
+ NotificationConsumer<HookNotificationMessage> consumer = createNewConsumer(kafkaNotification, false);
NotificationHookConsumer notificationHookConsumer =
new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
- NotificationHookConsumer.HookConsumer hookConsumer =
- notificationHookConsumer.new HookConsumer(consumer);
-
+ NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer);
+
consumeOneMessage(consumer, hookConsumer);
verify(atlasEntityStore).createOrUpdate(any(EntityStream.class), anyBoolean());
// produce another message, and make sure it moves ahead. If commit succeeded, this would work.
produceMessage(new HookNotification.EntityCreateRequest("test_user2", createEntity()));
consumeOneMessage(consumer, hookConsumer);
- verify(atlasEntityStore, times(2)).createOrUpdate(any(EntityStream.class), anyBoolean());
+ verify(atlasEntityStore,times(2)).createOrUpdate(any(EntityStream.class), anyBoolean());
reset(atlasEntityStore);
}
finally {
@@ -113,42 +122,49 @@
public void testConsumerRemainsAtSameMessageWithAutoCommitEnabled() throws Exception {
try {
produceMessage(new HookNotification.EntityCreateRequest("test_user3", createEntity()));
-
- NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
- createNewConsumer(kafkaNotification, true);
+
+ NotificationConsumer<HookNotificationMessage> consumer = createNewConsumer(kafkaNotification, true);
+
+ assertNotNull (consumer);
+
NotificationHookConsumer notificationHookConsumer =
new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
- NotificationHookConsumer.HookConsumer hookConsumer =
- notificationHookConsumer.new HookConsumer(consumer);
-
+ NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer);
+
+
consumeOneMessage(consumer, hookConsumer);
verify(atlasEntityStore).createOrUpdate(any(EntityStream.class), anyBoolean());
-
+
// produce another message, but this will not be consumed, as commit code is not executed in hook consumer.
produceMessage(new HookNotification.EntityCreateRequest("test_user4", createEntity()));
-
+
consumeOneMessage(consumer, hookConsumer);
- verify(atlasEntityStore, times(2)).createOrUpdate(any(EntityStream.class), anyBoolean());
+ verify(atlasEntityStore,times(2)).createOrUpdate(any(EntityStream.class), anyBoolean());
}
finally {
kafkaNotification.close();
}
}
- NotificationConsumer<HookNotification.HookNotificationMessage> createNewConsumer(
- KafkaNotification kafkaNotification, boolean autoCommitEnabled) {
- return kafkaNotification.<HookNotification.HookNotificationMessage>createConsumers(
- NotificationInterface.NotificationType.HOOK, 1, autoCommitEnabled).get(0);
+ AtlasKafkaConsumer<HookNotificationMessage> createNewConsumer(KafkaNotification kafkaNotification, boolean autoCommitEnabled) {
+ return (AtlasKafkaConsumer) kafkaNotification.createConsumers(NotificationInterface.NotificationType.HOOK, 1, autoCommitEnabled).get(0);
}
- void consumeOneMessage(NotificationConsumer<HookNotification.HookNotificationMessage> consumer,
+ void consumeOneMessage(NotificationConsumer<HookNotificationMessage> consumer,
NotificationHookConsumer.HookConsumer hookConsumer) throws InterruptedException {
- while (!consumer.hasNext()) {
- Thread.sleep(1000);
- }
-
try {
- hookConsumer.handleMessage(consumer.next());
+ long startTime = System.currentTimeMillis(); //fetch starting time
+ while ((System.currentTimeMillis() - startTime) < 10000) {
+ List<AtlasKafkaMessage<HookNotificationMessage>> messages = consumer.receive(1000L);
+
+ for (AtlasKafkaMessage<HookNotificationMessage> msg : messages) {
+ hookConsumer.handleMessage(msg);
+ }
+
+ if (messages.size() > 0) {
+ break;
+ }
+ }
} catch (AtlasServiceException | AtlasException e) {
Assert.fail("Consumer failed with exception ", e);
}
@@ -163,7 +179,10 @@
}
KafkaNotification startKafkaServer() throws AtlasException, InterruptedException {
- KafkaNotification kafkaNotification = (KafkaNotification) notificationInterface;
+ Configuration applicationProperties = ApplicationProperties.get();
+ applicationProperties.setProperty("atlas.kafka.data", "target/" + RandomStringUtils.randomAlphanumeric(5));
+
+ kafkaNotification = new KafkaNotification(applicationProperties);
kafkaNotification.start();
Thread.sleep(2000);
return kafkaNotification;
@@ -173,8 +192,8 @@
return RandomStringUtils.randomAlphanumeric(10);
}
- private void produceMessage(HookNotification.HookNotificationMessage message) throws NotificationException {
- notificationInterface.send(NotificationInterface.NotificationType.HOOK, message);
+ private void produceMessage(HookNotificationMessage message) throws NotificationException {
+ kafkaNotification.send(NotificationInterface.NotificationType.HOOK, message);
}
}
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
index bdb60a2..f4ec56a 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
@@ -21,6 +21,7 @@
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.ha.HAConfiguration;
+import org.apache.atlas.kafka.AtlasKafkaMessage;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.notification.hook.HookNotification;
@@ -36,7 +37,7 @@
import org.mockito.MockitoAnnotations;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-
+import org.apache.kafka.common.TopicPartition;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -124,9 +125,8 @@
Referenceable mock = mock(Referenceable.class);
when(message.getEntities()).thenReturn(Arrays.asList(mock));
- hookConsumer.handleMessage(message);
-
- verify(consumer).commit();
+ hookConsumer.handleMessage(new AtlasKafkaMessage(message, -1, -1));
+ verify(consumer).commit(any(TopicPartition.class),anyInt());
}
@Test
@@ -141,7 +141,7 @@
{ add(mock(Referenceable.class)); }
});
when(atlasEntityStore.createOrUpdate(any(EntityStream.class), anyBoolean())).thenThrow(new RuntimeException("Simulating exception in processing message"));
- hookConsumer.handleMessage(message);
+ hookConsumer.handleMessage(new AtlasKafkaMessage(message, -1, -1));
verifyZeroInteractions(consumer);
}
diff --git a/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java b/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java
index b59d3ee..c036cfa 100755
--- a/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java
+++ b/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java
@@ -21,7 +21,6 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
-import kafka.consumer.ConsumerTimeoutException;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasClientV2;
@@ -42,7 +41,9 @@
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.notification.NotificationConsumer;
+import org.apache.atlas.kafka.*;
import org.apache.atlas.notification.entity.EntityNotification;
+import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.Struct;
@@ -634,14 +635,21 @@
@Override
public boolean evaluate() throws Exception {
try {
- while (consumer.hasNext() && System.currentTimeMillis() < maxCurrentTime) {
- EntityNotification notification = consumer.next();
- if (predicate.evaluate(notification)) {
- pair.left = notification;
- return true;
- }
+
+ while (System.currentTimeMillis() < maxCurrentTime) {
+ List<AtlasKafkaMessage<EntityNotification>> messageList = consumer.receive(1000);
+ if(messageList.size() > 0) {
+ EntityNotification notification = messageList.get(0).getMessage();
+ if (predicate.evaluate(notification)) {
+ pair.left = notification;
+ return true;
+ }
+ }else{
+ LOG.info( System.currentTimeMillis()+ " messageList no records" +maxCurrentTime );
+ }
}
- } catch(ConsumerTimeoutException e) {
+ } catch(Exception e) {
+ LOG.error(" waitForNotification", e);
//ignore
}
return false;
diff --git a/webapp/src/test/java/org/apache/atlas/web/integration/EntityJerseyResourceIT.java b/webapp/src/test/java/org/apache/atlas/web/integration/EntityJerseyResourceIT.java
index 310b2e3..b527583 100755
--- a/webapp/src/test/java/org/apache/atlas/web/integration/EntityJerseyResourceIT.java
+++ b/webapp/src/test/java/org/apache/atlas/web/integration/EntityJerseyResourceIT.java
@@ -81,7 +81,6 @@
private static final String TRAITS = "traits";
private NotificationInterface notificationInterface = NotificationProvider.get();
- private NotificationConsumer<EntityNotification> notificationConsumer;
@BeforeClass
public void setUp() throws Exception {
@@ -89,10 +88,6 @@
createTypeDefinitionsV1();
- List<NotificationConsumer<EntityNotification>> consumers =
- notificationInterface.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1);
-
- notificationConsumer = consumers.iterator().next();
}
@Test
@@ -218,29 +213,12 @@
assertEntityAudit(dbId, EntityAuditEvent.EntityAuditAction.ENTITY_CREATE);
- waitForNotification(notificationConsumer, MAX_WAIT_TIME, new NotificationPredicate() {
- @Override
- public boolean evaluate(EntityNotification notification) throws Exception {
- return notification != null && notification.getEntity().getId()._getId().equals(dbId);
- }
- });
-
JSONArray results = searchByDSL(String.format("%s where qualifiedName='%s'", DATABASE_TYPE_BUILTIN, dbName));
assertEquals(results.length(), 1);
//create entity again shouldn't create another instance with same unique attribute value
List<String> entityResults = atlasClientV1.createEntity(HiveDBInstance);
assertEquals(entityResults.size(), 0);
- try {
- waitForNotification(notificationConsumer, MAX_WAIT_TIME, new NotificationPredicate() {
- @Override
- public boolean evaluate(EntityNotification notification) throws Exception {
- return notification != null && notification.getEntity().getId()._getId().equals(dbId);
- }
- });
- } catch (Exception e) {
- //expected timeout
- }
results = searchByDSL(String.format("%s where qualifiedName='%s'", DATABASE_TYPE_BUILTIN, dbName));
assertEquals(results.length(), 1);
diff --git a/webapp/src/test/java/org/apache/atlas/web/integration/EntityV2JerseyResourceIT.java b/webapp/src/test/java/org/apache/atlas/web/integration/EntityV2JerseyResourceIT.java
index 98a7abc..d61a9af 100755
--- a/webapp/src/test/java/org/apache/atlas/web/integration/EntityV2JerseyResourceIT.java
+++ b/webapp/src/test/java/org/apache/atlas/web/integration/EntityV2JerseyResourceIT.java
@@ -55,7 +55,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-
+import org.apache.atlas.kafka.AtlasKafkaConsumer;
import static org.testng.Assert.*;
@@ -72,8 +72,6 @@
private AtlasEntity dbEntity;
private AtlasEntity tableEntity;
- private NotificationInterface notificationInterface = NotificationProvider.get();
- private NotificationConsumer<EntityNotification> notificationConsumer;
@BeforeClass
public void setUp() throws Exception {
@@ -81,10 +79,6 @@
createTypeDefinitionsV2();
- List<NotificationConsumer<EntityNotification>> consumers =
- notificationInterface.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1);
-
- notificationConsumer = consumers.iterator().next();
}
@Test
@@ -166,14 +160,6 @@
assertEquals(results.length(), 1);
final AtlasEntity hiveDBInstanceV2 = createHiveDB();
- // Do the notification thing here
- waitForNotification(notificationConsumer, MAX_WAIT_TIME, new NotificationPredicate() {
- @Override
- public boolean evaluate(EntityNotification notification) throws Exception {
- return notification != null && notification.getEntity().getId()._getId().equals(hiveDBInstanceV2.getGuid());
- }
- });
-
results = searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE_V2, DATABASE_NAME));
assertEquals(results.length(), 1);