SAMZA-2587: IntermediateMessageSerde exception handling (#1426)

Upgrade Instructions: For users that are upgrading directly from samza version 0.13.0 or older versions: A message type of intermediate messages was introduced in samza 0.13.1. For samza 0.13.0 or older versions, the first byte of MessageType doesn't exist in the bytes. Thus, upgrading from those versions will fail. There are three ways to fix this issue:
a) Reset checkpoint to consume from newest message in the intermediate stream
b) Clean all existing messages in the intermediate stream
c) Run the application in any version between 0.13.1 and 1.5 until all old messages in intermediate stream has reached retention time.

Co-authored-by: Yixing Zhang <yixzhang@linkedin.com>
diff --git a/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java b/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
index a8f9852..83a0a35 100644
--- a/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
+++ b/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
@@ -67,7 +67,21 @@
   public Object fromBytes(byte[] bytes) {
     try {
       final Object object;
-      final MessageType type = MessageType.values()[bytes[0]];
+      final MessageType type;
+      try {
+        type = MessageType.values()[bytes[0]];
+      } catch (ArrayIndexOutOfBoundsException e) {
+        // The message type was introduced in samza 0.13.1. For samza 0.13.0 or older versions, the first byte of
+        // MessageType doesn't exist in the bytes. Thus, upgrading from those versions will get this exception.
+        // There are three ways to solve this issue:
+        // a) Reset checkpoint to consume from newest message in the intermediate stream
+        // b) clean all existing messages in the intermediate stream
+        // c) Run the application in any version between 0.13.1 and 1.5 until all old messages in intermediate stream
+        // has reached retention time.
+        throw new SamzaException("Error reading the message type from intermediate message. This may happen if you "
+            + "have recently upgraded from samza version older than 0.13.1 or there are still old messages in the "
+            + "intermediate stream.", e);
+      }
       final byte[] data = Arrays.copyOfRange(bytes, 1, bytes.length);
       switch (type) {
         case USER_MESSAGE:
@@ -83,21 +97,10 @@
           throw new UnsupportedOperationException(String.format("Message type %s is not supported", type.name()));
       }
       return object;
-
     } catch (UnsupportedOperationException ue) {
       throw new SamzaException(ue);
     } catch (Exception e) {
-      // This will catch the following exceptions:
-      // 1) the first byte is not a valid type so it will cause ArrayOutOfBound exception
-      // 2) the first byte happens to be a valid type, but the deserialization fails with certain exception
-      // For these cases, we fall back to user-provided serde
-      try {
-        return userMessageSerde.fromBytes(bytes);
-      } catch (Exception umse) {
-        LOGGER.error("Error deserializing from both intermediate message serde and user message serde. "
-            + "Original exception: ", e);
-        throw umse;
-      }
+      throw e;
     }
   }
 
diff --git a/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java b/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java
index 7a3faca..22250fc 100644
--- a/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java
+++ b/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java
@@ -24,15 +24,20 @@
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
+import java.util.Arrays;
 import org.apache.samza.serializers.IntermediateMessageSerde;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.EndOfStreamMessage;
 import org.apache.samza.system.MessageType;
 import org.apache.samza.system.WatermarkMessage;
+import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.*;
 
 
 public class TestIntermediateMessageSerde {
@@ -126,4 +131,26 @@
     assertEquals(de.getTaskName(), taskName);
     assertEquals(de.getVersion(), 1);
   }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testUserMessageSerdeException() {
+    Serde<?> mockUserMessageSerde = mock(Serde.class);
+    when(mockUserMessageSerde.fromBytes(anyObject())).then(new Answer<Object>() {
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        byte[] bytes = invocation.getArgumentAt(0, byte[].class);
+        if (Arrays.equals(bytes, new byte[]{1, 2})) {
+          throw new IllegalArgumentException("User message serde failed to deserialize this message.");
+        } else {
+          // Intermediate message serde shouldn't try to deserialize user message with wrong bytes
+          Assert.fail();
+          return null;
+        }
+      }
+    });
+
+    IntermediateMessageSerde imserde = new IntermediateMessageSerde(mockUserMessageSerde);
+    byte[] bytes = new byte[]{0, 1, 2};
+    imserde.fromBytes(bytes);
+  }
 }