Error handling in kafka serializers
diff --git a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/consumer/MessageConsumer.java b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/consumer/MessageConsumer.java
index 9af5a59..f787fe7 100644
--- a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/consumer/MessageConsumer.java
+++ b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/consumer/MessageConsumer.java
@@ -44,8 +44,6 @@
LOGGER.info("Received data orchestrator records {}", partitionRecords.size());
for (ConsumerRecord<String, NotificationEvent> record : partitionRecords) {
-
-
try {
callback.process(record.value());
} catch (Exception exception) {
@@ -53,8 +51,6 @@
}finally {
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(record.offset() + 1)));
}
-
-
}
}
}
diff --git a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventDeserializer.java b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventDeserializer.java
index bf0265d..a14d4ce 100644
--- a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventDeserializer.java
+++ b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventDeserializer.java
@@ -2,6 +2,8 @@
import com.google.gson.Gson;
import org.apache.kafka.common.serialization.Deserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Map;
@@ -9,6 +11,9 @@
* Notification event deserializer
*/
public class NotificationEventDeserializer implements Deserializer<NotificationEvent> {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(NotificationEventDeserializer.class);
+
@Override
public void configure(Map<String, ?> map, boolean b) {
@@ -17,8 +22,13 @@
@Override
public NotificationEvent deserialize(String topic, byte[] bytes) {
String deserialized = new String(bytes);
- Gson gson = new Gson();
- return gson.fromJson(deserialized, NotificationEvent.class);
+ try {
+ Gson gson = new Gson();
+ return gson.fromJson(deserialized, NotificationEvent.class);
+ } catch (Exception e) {
+ LOGGER.error("Failed to deserialize the message {}. So returning null", deserialized, e);
+ return null;
+ }
}
@Override
diff --git a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventSerializer.java b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventSerializer.java
index 3ebe998..a98d477 100644
--- a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventSerializer.java
+++ b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventSerializer.java
@@ -2,6 +2,8 @@
import com.google.gson.Gson;
import org.apache.kafka.common.serialization.Serializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
import java.util.Map;
@@ -10,6 +12,9 @@
* Notification event serializer
*/
public class NotificationEventSerializer implements Serializer<NotificationEvent> {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(NotificationEventSerializer.class);
+
@Override
public void configure(Map<String, ?> map, boolean b) {
@@ -18,8 +23,13 @@
@Override
public byte[] serialize(String s, NotificationEvent notificationEvent) {
- Gson gson = new Gson();
- return gson.toJson(notificationEvent).getBytes(StandardCharsets.UTF_8);
+ try {
+ Gson gson = new Gson();
+ return gson.toJson(notificationEvent).getBytes(StandardCharsets.UTF_8);
+ } catch (Exception e) {
+ LOGGER.error("Failed to serialize message {}. So returning null", s, e);
+ return null;
+ }
}
@Override