ATLAS-1908: updated to use existing Kafka consumer properties when equivalent new Kafka consumer properties are not present

Signed-off-by: Madhan Neethiraj <madhan@apache.org>
diff --git a/distro/src/conf/atlas-application.properties b/distro/src/conf/atlas-application.properties
index c3213df..29a4cc1 100755
--- a/distro/src/conf/atlas-application.properties
+++ b/distro/src/conf/atlas-application.properties
@@ -79,7 +79,7 @@
 atlas.kafka.enable.auto.commit=false
 atlas.kafka.auto.offset.reset=earliest
 atlas.kafka.session.timeout.ms=30000
-
+atlas.kafka.poll.timeout.ms=1000
 
 atlas.notification.create.topics=true
 atlas.notification.replicas=1
diff --git a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
index 9c15243..52d0916 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
@@ -41,19 +41,20 @@
     private static final Logger LOG = LoggerFactory.getLogger(AtlasKafkaConsumer.class);
 
     private final KafkaConsumer kafkaConsumer;
-    private final boolean       autoCommitEnabled;
+    private final boolean autoCommitEnabled;
+    private long pollTimeoutMilliSeconds = 1000L;
 
-    public AtlasKafkaConsumer(MessageDeserializer<T> deserializer, KafkaConsumer kafkaConsumer, boolean autoCommitEnabled) {
+    public AtlasKafkaConsumer(MessageDeserializer<T> deserializer, KafkaConsumer kafkaConsumer, boolean autoCommitEnabled, long pollTimeoutMilliSeconds) {
         super(deserializer);
-
-        this.kafkaConsumer     = kafkaConsumer;
+        this.kafkaConsumer = kafkaConsumer;
         this.autoCommitEnabled = autoCommitEnabled;
+        this.pollTimeoutMilliSeconds = pollTimeoutMilliSeconds;
     }
 
-    public List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds) {
+    public List<AtlasKafkaMessage<T>> receive() {
         List<AtlasKafkaMessage<T>> messages = new ArrayList();
 
-        ConsumerRecords<?, ?> records = kafkaConsumer.poll(timeoutMilliSeconds);
+        ConsumerRecords<?, ?> records = kafkaConsumer.poll(pollTimeoutMilliSeconds);
 
         if (records != null) {
             for (ConsumerRecord<?, ?> record : records) {
diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
index 366c8a7..38889ef 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
@@ -83,6 +83,7 @@
     private Properties properties;
     private KafkaConsumer consumer = null;
     private KafkaProducer producer = null;
+    private Long pollTimeOutMs = 1000L;
 
     private static final Map<NotificationType, String> TOPIC_MAP = new HashMap<NotificationType, String>() {
         {
@@ -124,6 +125,13 @@
         properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                 "org.apache.kafka.common.serialization.StringDeserializer");
         properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+        pollTimeOutMs = subsetConfiguration.getLong("poll.timeout.ms", 1000);
+        boolean oldApiCommitEnbleFlag = subsetConfiguration.getBoolean("auto.commit.enable",false);
+        //set old autocommit value if new autoCommit property is not set.
+        properties.put("enable.auto.commit", subsetConfiguration.getBoolean("enable.auto.commit", oldApiCommitEnbleFlag));
+        properties.put("session.timeout.ms", subsetConfiguration.getString("session.timeout.ms", "30000"));
+
     }
 
     @VisibleForTesting
@@ -167,7 +175,7 @@
     public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType,
                                                              int numConsumers) {
         return createConsumers(notificationType, numConsumers,
-                Boolean.valueOf(properties.getProperty("enable.auto.commit", "true")));
+                Boolean.valueOf(properties.getProperty("enable.auto.commit", properties.getProperty("auto.commit.enable","false"))));
     }
 
     @VisibleForTesting
@@ -177,7 +185,7 @@
         Properties consumerProperties = getConsumerProperties(notificationType);
 
         List<NotificationConsumer<T>> consumers = new ArrayList<>();
-        AtlasKafkaConsumer kafkaConsumer = new AtlasKafkaConsumer(notificationType.getDeserializer(), getKafkaConsumer(consumerProperties,notificationType, autoCommitEnabled), autoCommitEnabled);
+        AtlasKafkaConsumer kafkaConsumer = new AtlasKafkaConsumer(notificationType.getDeserializer(), getKafkaConsumer(consumerProperties,notificationType, autoCommitEnabled), autoCommitEnabled, pollTimeOutMs );
         consumers.add(kafkaConsumer);
         return consumers;
     }
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
index 22e40f9..6d1c08a 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
@@ -41,8 +41,7 @@
 
     /**
      * Fetch data for the topics from Kafka
-     * @param timeoutMilliSeconds poll timeout
      * @return List containing kafka message and partionId and offset.
      */
-    List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds);
+    List<AtlasKafkaMessage<T>> receive();
 }
diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
index 70059cb..9b712f4 100644
--- a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
+++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
@@ -95,12 +95,12 @@
         ConsumerRecords records = new ConsumerRecords(mp);
 
 
-        when(kafkaConsumer.poll(1000)).thenReturn(records);
+        when(kafkaConsumer.poll(100)).thenReturn(records);
         when(messageAndMetadata.message()).thenReturn(json);
 
 
-        AtlasKafkaConsumer consumer = new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer,false);
-        List<AtlasKafkaMessage<HookNotification.HookNotificationMessage>> messageList = consumer.receive(1000);
+        AtlasKafkaConsumer consumer = new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer, false, 100L);
+        List<AtlasKafkaMessage<HookNotification.HookNotificationMessage>> messageList = consumer.receive();
         assertTrue(messageList.size() > 0);
 
         HookNotification.HookNotificationMessage consumedMessage  = messageList.get(0).getMessage();
@@ -131,12 +131,12 @@
         mp.put(tp,klist);
         ConsumerRecords records = new ConsumerRecords(mp);
 
-        when(kafkaConsumer.poll(1000)).thenReturn(records);
+        when(kafkaConsumer.poll(100L)).thenReturn(records);
         when(messageAndMetadata.message()).thenReturn(json);
 
-        AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer ,false);
+        AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer ,false, 100L);
         try {
-            List<AtlasKafkaMessage<HookNotification.HookNotificationMessage>> messageList = consumer.receive(1000);
+            List<AtlasKafkaMessage<HookNotification.HookNotificationMessage>> messageList = consumer.receive();
             assertTrue(messageList.size() > 0);
 
             HookNotification.HookNotificationMessage consumedMessage  = messageList.get(0).getMessage();
@@ -154,7 +154,7 @@
 
         TopicPartition tp = new TopicPartition("ATLAS_HOOK",0);
 
-        AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer, false);
+        AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer, false, 100L);
 
         consumer.commit(tp, 1);
 
@@ -166,7 +166,7 @@
 
         TopicPartition tp = new TopicPartition("ATLAS_HOOK",0);
 
-        AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer, true);
+        AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer, true , 100L);
 
         consumer.commit(tp, 1);
 
diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
index c791d43..a1e13b9 100644
--- a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
+++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
@@ -70,7 +70,7 @@
         List<AtlasKafkaMessage<Object>> messages = null ;
         long startTime = System.currentTimeMillis(); //fetch starting time
         while ((System.currentTimeMillis() - startTime) < 10000) {
-             messages = consumer.receive(1000L);
+             messages = consumer.receive();
             if (messages.size() > 0) {
                 break;
             }
diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
index 8324b57..68fe3d7 100644
--- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
@@ -67,7 +67,7 @@
         NotificationConsumer<TestMessage> consumer =
                 new TestNotificationConsumer<>(versionedMessageType, jsonList, logger);
 
-        List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive(1000L);
+        List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive();
 
         assertFalse(messageList.isEmpty());
 
@@ -106,7 +106,7 @@
         NotificationConsumer<TestMessage> consumer =
             new TestNotificationConsumer<>(versionedMessageType, jsonList, logger);
 
-        List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive(1000L);
+        List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive();
 
         assertEquals(new TestMessage("sValue1", 99), messageList.get(0).getMessage());
 
@@ -138,7 +138,7 @@
         NotificationConsumer<TestMessage> consumer =
             new TestNotificationConsumer<>(versionedMessageType, jsonList, logger);
         try {
-            List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive(1000L);
+            List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive();
 
             messageList.get(1).getMessage();
 
@@ -203,7 +203,7 @@
         }
 
         @Override
-        public List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds) {
+        public List<AtlasKafkaMessage<T>> receive() {
             List<AtlasKafkaMessage<T>> tempMessageList = new ArrayList();
             for(Object json :  messageList) {
                 tempMessageList.add(new AtlasKafkaMessage(deserializer.deserialize((String)json), -1, -1));
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 0dea0e2..51276d3 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -224,7 +224,7 @@
 
             while (shouldRun.get()) {
                 try {
-                    List<AtlasKafkaMessage<HookNotificationMessage>> messages = consumer.receive(1000L);
+                    List<AtlasKafkaMessage<HookNotificationMessage>> messages = consumer.receive();
                     for (AtlasKafkaMessage<HookNotificationMessage> msg : messages) {
                         handleMessage(msg);
                     }
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
index 650ca0a..eb37fa8 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
@@ -155,7 +155,7 @@
         try {
             long startTime = System.currentTimeMillis(); //fetch starting time
             while ((System.currentTimeMillis() - startTime) < 10000) {
-                List<AtlasKafkaMessage<HookNotificationMessage>> messages = consumer.receive(1000L);
+                List<AtlasKafkaMessage<HookNotificationMessage>> messages = consumer.receive();
 
                 for (AtlasKafkaMessage<HookNotificationMessage> msg : messages) {
                     hookConsumer.handleMessage(msg);
diff --git a/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java b/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java
index c036cfa..496185f 100755
--- a/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java
+++ b/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java
@@ -637,7 +637,7 @@
                 try {
 
                     while (System.currentTimeMillis() < maxCurrentTime) {
-                        List<AtlasKafkaMessage<EntityNotification>> messageList = consumer.receive(1000);
+                        List<AtlasKafkaMessage<EntityNotification>> messageList = consumer.receive();
                             if(messageList.size() > 0) {
                                 EntityNotification notification = messageList.get(0).getMessage();
                                 if (predicate.evaluate(notification)) {