ATLAS-1908: updated to use existing Kafka consumer properties when equivalent new Kafka consumer properties are not present
Signed-off-by: Madhan Neethiraj <madhan@apache.org>
diff --git a/distro/src/conf/atlas-application.properties b/distro/src/conf/atlas-application.properties
index c3213df..29a4cc1 100755
--- a/distro/src/conf/atlas-application.properties
+++ b/distro/src/conf/atlas-application.properties
@@ -79,7 +79,7 @@
atlas.kafka.enable.auto.commit=false
atlas.kafka.auto.offset.reset=earliest
atlas.kafka.session.timeout.ms=30000
-
+atlas.kafka.poll.timeout.ms=1000
atlas.notification.create.topics=true
atlas.notification.replicas=1
diff --git a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
index 9c15243..52d0916 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
@@ -41,19 +41,20 @@
private static final Logger LOG = LoggerFactory.getLogger(AtlasKafkaConsumer.class);
private final KafkaConsumer kafkaConsumer;
- private final boolean autoCommitEnabled;
+ private final boolean autoCommitEnabled;
+ private long pollTimeoutMilliSeconds = 1000L;
- public AtlasKafkaConsumer(MessageDeserializer<T> deserializer, KafkaConsumer kafkaConsumer, boolean autoCommitEnabled) {
+ public AtlasKafkaConsumer(MessageDeserializer<T> deserializer, KafkaConsumer kafkaConsumer, boolean autoCommitEnabled, long pollTimeoutMilliSeconds) {
super(deserializer);
-
- this.kafkaConsumer = kafkaConsumer;
+ this.kafkaConsumer = kafkaConsumer;
this.autoCommitEnabled = autoCommitEnabled;
+ this.pollTimeoutMilliSeconds = pollTimeoutMilliSeconds;
}
- public List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds) {
+ public List<AtlasKafkaMessage<T>> receive() {
List<AtlasKafkaMessage<T>> messages = new ArrayList();
- ConsumerRecords<?, ?> records = kafkaConsumer.poll(timeoutMilliSeconds);
+ ConsumerRecords<?, ?> records = kafkaConsumer.poll(pollTimeoutMilliSeconds);
if (records != null) {
for (ConsumerRecord<?, ?> record : records) {
diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
index 366c8a7..38889ef 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
@@ -83,6 +83,7 @@
private Properties properties;
private KafkaConsumer consumer = null;
private KafkaProducer producer = null;
+ private Long pollTimeOutMs = 1000L;
private static final Map<NotificationType, String> TOPIC_MAP = new HashMap<NotificationType, String>() {
{
@@ -124,6 +125,13 @@
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+ pollTimeOutMs = subsetConfiguration.getLong("poll.timeout.ms", 1000);
+ boolean oldApiCommitEnbleFlag = subsetConfiguration.getBoolean("auto.commit.enable",false);
+ //set old autocommit value if new autoCommit property is not set.
+ properties.put("enable.auto.commit", subsetConfiguration.getBoolean("enable.auto.commit", oldApiCommitEnbleFlag));
+ properties.put("session.timeout.ms", subsetConfiguration.getString("session.timeout.ms", "30000"));
+
}
@VisibleForTesting
@@ -167,7 +175,7 @@
public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType,
int numConsumers) {
return createConsumers(notificationType, numConsumers,
- Boolean.valueOf(properties.getProperty("enable.auto.commit", "true")));
+ Boolean.valueOf(properties.getProperty("enable.auto.commit", properties.getProperty("auto.commit.enable","false"))));
}
@VisibleForTesting
@@ -177,7 +185,7 @@
Properties consumerProperties = getConsumerProperties(notificationType);
List<NotificationConsumer<T>> consumers = new ArrayList<>();
- AtlasKafkaConsumer kafkaConsumer = new AtlasKafkaConsumer(notificationType.getDeserializer(), getKafkaConsumer(consumerProperties,notificationType, autoCommitEnabled), autoCommitEnabled);
+ AtlasKafkaConsumer kafkaConsumer = new AtlasKafkaConsumer(notificationType.getDeserializer(), getKafkaConsumer(consumerProperties,notificationType, autoCommitEnabled), autoCommitEnabled, pollTimeOutMs );
consumers.add(kafkaConsumer);
return consumers;
}
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
index 22e40f9..6d1c08a 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
@@ -41,8 +41,7 @@
/**
* Fetch data for the topics from Kafka
- * @param timeoutMilliSeconds poll timeout
* @return List containing kafka message and partionId and offset.
*/
- List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds);
+ List<AtlasKafkaMessage<T>> receive();
}
diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
index 70059cb..9b712f4 100644
--- a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
+++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
@@ -95,12 +95,12 @@
ConsumerRecords records = new ConsumerRecords(mp);
- when(kafkaConsumer.poll(1000)).thenReturn(records);
+ when(kafkaConsumer.poll(100)).thenReturn(records);
when(messageAndMetadata.message()).thenReturn(json);
- AtlasKafkaConsumer consumer = new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer,false);
- List<AtlasKafkaMessage<HookNotification.HookNotificationMessage>> messageList = consumer.receive(1000);
+ AtlasKafkaConsumer consumer = new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer, false, 100L);
+ List<AtlasKafkaMessage<HookNotification.HookNotificationMessage>> messageList = consumer.receive();
assertTrue(messageList.size() > 0);
HookNotification.HookNotificationMessage consumedMessage = messageList.get(0).getMessage();
@@ -131,12 +131,12 @@
mp.put(tp,klist);
ConsumerRecords records = new ConsumerRecords(mp);
- when(kafkaConsumer.poll(1000)).thenReturn(records);
+ when(kafkaConsumer.poll(100L)).thenReturn(records);
when(messageAndMetadata.message()).thenReturn(json);
- AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer ,false);
+ AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer ,false, 100L);
try {
- List<AtlasKafkaMessage<HookNotification.HookNotificationMessage>> messageList = consumer.receive(1000);
+ List<AtlasKafkaMessage<HookNotification.HookNotificationMessage>> messageList = consumer.receive();
assertTrue(messageList.size() > 0);
HookNotification.HookNotificationMessage consumedMessage = messageList.get(0).getMessage();
@@ -154,7 +154,7 @@
TopicPartition tp = new TopicPartition("ATLAS_HOOK",0);
- AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer, false);
+ AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer, false, 100L);
consumer.commit(tp, 1);
@@ -166,7 +166,7 @@
TopicPartition tp = new TopicPartition("ATLAS_HOOK",0);
- AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer, true);
+ AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer, true , 100L);
consumer.commit(tp, 1);
diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
index c791d43..a1e13b9 100644
--- a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
+++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
@@ -70,7 +70,7 @@
List<AtlasKafkaMessage<Object>> messages = null ;
long startTime = System.currentTimeMillis(); //fetch starting time
while ((System.currentTimeMillis() - startTime) < 10000) {
- messages = consumer.receive(1000L);
+ messages = consumer.receive();
if (messages.size() > 0) {
break;
}
diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
index 8324b57..68fe3d7 100644
--- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
@@ -67,7 +67,7 @@
NotificationConsumer<TestMessage> consumer =
new TestNotificationConsumer<>(versionedMessageType, jsonList, logger);
- List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive(1000L);
+ List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive();
assertFalse(messageList.isEmpty());
@@ -106,7 +106,7 @@
NotificationConsumer<TestMessage> consumer =
new TestNotificationConsumer<>(versionedMessageType, jsonList, logger);
- List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive(1000L);
+ List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive();
assertEquals(new TestMessage("sValue1", 99), messageList.get(0).getMessage());
@@ -138,7 +138,7 @@
NotificationConsumer<TestMessage> consumer =
new TestNotificationConsumer<>(versionedMessageType, jsonList, logger);
try {
- List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive(1000L);
+ List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive();
messageList.get(1).getMessage();
@@ -203,7 +203,7 @@
}
@Override
- public List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds) {
+ public List<AtlasKafkaMessage<T>> receive() {
List<AtlasKafkaMessage<T>> tempMessageList = new ArrayList();
for(Object json : messageList) {
tempMessageList.add(new AtlasKafkaMessage(deserializer.deserialize((String)json), -1, -1));
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 0dea0e2..51276d3 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -224,7 +224,7 @@
while (shouldRun.get()) {
try {
- List<AtlasKafkaMessage<HookNotificationMessage>> messages = consumer.receive(1000L);
+ List<AtlasKafkaMessage<HookNotificationMessage>> messages = consumer.receive();
for (AtlasKafkaMessage<HookNotificationMessage> msg : messages) {
handleMessage(msg);
}
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
index 650ca0a..eb37fa8 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
@@ -155,7 +155,7 @@
try {
long startTime = System.currentTimeMillis(); //fetch starting time
while ((System.currentTimeMillis() - startTime) < 10000) {
- List<AtlasKafkaMessage<HookNotificationMessage>> messages = consumer.receive(1000L);
+ List<AtlasKafkaMessage<HookNotificationMessage>> messages = consumer.receive();
for (AtlasKafkaMessage<HookNotificationMessage> msg : messages) {
hookConsumer.handleMessage(msg);
diff --git a/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java b/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java
index c036cfa..496185f 100755
--- a/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java
+++ b/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java
@@ -637,7 +637,7 @@
try {
while (System.currentTimeMillis() < maxCurrentTime) {
- List<AtlasKafkaMessage<EntityNotification>> messageList = consumer.receive(1000);
+ List<AtlasKafkaMessage<EntityNotification>> messageList = consumer.receive();
if(messageList.size() > 0) {
EntityNotification notification = messageList.get(0).getMessage();
if (predicate.evaluate(notification)) {