Error handling in kafka serializers
diff --git a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/consumer/MessageConsumer.java b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/consumer/MessageConsumer.java
index 9af5a59..f787fe7 100644
--- a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/consumer/MessageConsumer.java
+++ b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/consumer/MessageConsumer.java
@@ -44,8 +44,6 @@
                     LOGGER.info("Received data orchestrator records {}", partitionRecords.size());
 
                     for (ConsumerRecord<String, NotificationEvent> record : partitionRecords) {
-
-
                         try {
                             callback.process(record.value());
                         } catch (Exception exception) {
@@ -53,8 +51,6 @@
                         }finally {
                             consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(record.offset() + 1)));
                         }
-
-
                     }
                 }
             }
diff --git a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventDeserializer.java b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventDeserializer.java
index bf0265d..a14d4ce 100644
--- a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventDeserializer.java
+++ b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventDeserializer.java
@@ -2,6 +2,8 @@
 
 import com.google.gson.Gson;
 import org.apache.kafka.common.serialization.Deserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 
@@ -9,6 +11,9 @@
  * Notification event deserializer
  */
 public class NotificationEventDeserializer implements Deserializer<NotificationEvent> {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(NotificationEventDeserializer.class);
+
     @Override
     public void configure(Map<String, ?> map, boolean b) {
 
@@ -17,8 +22,13 @@
     @Override
     public NotificationEvent deserialize(String topic, byte[] bytes) {
         String deserialized = new String(bytes);
-        Gson gson = new Gson();
-        return gson.fromJson(deserialized, NotificationEvent.class);
+        try {
+            Gson gson = new Gson();
+            return gson.fromJson(deserialized, NotificationEvent.class);
+        } catch (Exception e) {
+            LOGGER.error("Failed to deserialize the message {}. So returning null", deserialized, e);
+            return null;
+        }
     }
 
     @Override
diff --git a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventSerializer.java b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventSerializer.java
index 3ebe998..a98d477 100644
--- a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventSerializer.java
+++ b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventSerializer.java
@@ -2,6 +2,8 @@
 
 import com.google.gson.Gson;
 import org.apache.kafka.common.serialization.Serializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.nio.charset.StandardCharsets;
 import java.util.Map;
@@ -10,6 +12,9 @@
  * Notification event serializer
  */
 public class NotificationEventSerializer implements Serializer<NotificationEvent> {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(NotificationEventSerializer.class);
+
     @Override
     public void configure(Map<String, ?> map, boolean b) {
 
@@ -18,8 +23,13 @@
     @Override
     public byte[] serialize(String s, NotificationEvent notificationEvent) {
 
-        Gson gson = new Gson();
-        return gson.toJson(notificationEvent).getBytes(StandardCharsets.UTF_8);
+        try {
+            Gson gson = new Gson();
+            return gson.toJson(notificationEvent).getBytes(StandardCharsets.UTF_8);
+        } catch (Exception e) {
+            LOGGER.error("Failed to serialize message {}. So returning null", s, e);
+            return null;
+        }
     }
 
     @Override