According to jbonofre, the transformation of deliverable into serializable objects lies in RabbitMqMessage
And i agree, it's way better.
diff --git a/sdks/java/io/rabbitmq/src/main/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqIO.java b/sdks/java/io/rabbitmq/src/main/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqIO.java
index 6ba12bf..0f0142e 100644
--- a/sdks/java/io/rabbitmq/src/main/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqIO.java
+++ b/sdks/java/io/rabbitmq/src/main/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqIO.java
@@ -20,15 +20,12 @@
 import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
 
 import com.google.auto.value.AutoValue;
-import com.rabbitmq.client.AMQP.BasicProperties;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
-import com.rabbitmq.client.Envelope;
-import com.rabbitmq.client.LongString;
 import com.rabbitmq.client.MessageProperties;
 import com.rabbitmq.client.QueueingConsumer;
-import com.rabbitmq.client.QueueingConsumer.Delivery;
+
 import java.io.IOException;
 import java.io.Serializable;
 import java.net.URISyntaxException;
@@ -36,9 +33,7 @@
 import java.security.KeyManagementException;
 import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.concurrent.TimeoutException;
 import javax.annotation.Nullable;
@@ -459,7 +454,6 @@
         if (delivery == null) {
           return false;
         }
-        delivery = serializableDeliveryOf(delivery);
         if (source.spec.useCorrelationId()) {
           String correlationId = delivery.getProperties().getCorrelationId();
           if (correlationId == null) {
@@ -483,68 +477,6 @@
       return true;
     }
 
-    /**
-     * ake delivery serializable by cloning all non-serializable values into serializable ones. If
-     * it is not possible, initial delivery is returned and error message is logged
-     *
-     * @param processed
-     * @return
-     */
-    private Delivery serializableDeliveryOf(Delivery processed) {
-      // All content of envelope is serializable, so no problem there
-      Envelope envelope = processed.getEnvelope();
-      // in basicproperties, there may be LongString, which are *not* serializable
-      BasicProperties properties = processed.getProperties();
-      BasicProperties nextProperties =
-          new BasicProperties.Builder()
-              .appId(properties.getAppId())
-              .clusterId(properties.getClusterId())
-              .contentEncoding(properties.getContentEncoding())
-              .contentType(properties.getContentType())
-              .correlationId(properties.getCorrelationId())
-              .deliveryMode(properties.getDeliveryMode())
-              .expiration(properties.getExpiration())
-              .headers(serializableHeaders(properties.getHeaders()))
-              .messageId(properties.getMessageId())
-              .priority(properties.getPriority())
-              .replyTo(properties.getReplyTo())
-              .timestamp(properties.getTimestamp())
-              .type(properties.getType())
-              .userId(properties.getUserId())
-              .build();
-      return new Delivery(envelope, nextProperties, processed.getBody());
-    }
-
-    private Map<String, Object> serializableHeaders(Map<String, Object> headers) {
-      Map<String, Object> returned = new HashMap<>();
-      if (headers != null) {
-        for (Map.Entry<String, Object> h : headers.entrySet()) {
-          Object value = h.getValue();
-          if (!(value instanceof Serializable)) {
-            try {
-              if (value instanceof LongString) {
-                LongString longString = (LongString) value;
-                byte[] bytes = longString.getBytes();
-                String s = new String(bytes, "UTF-8");
-                value = s;
-              } else {
-                throw new RuntimeException(
-                    String.format("no transformation defined for %s", value));
-              }
-            } catch (Throwable t) {
-              throw new UnsupportedOperationException(
-                  String.format(
-                      "can't make unserializable value %s a serializable value (which is mandatory for Apache Beam dataflow implementation)",
-                      value),
-                  t);
-            }
-          }
-          returned.put(h.getKey(), value);
-        }
-      }
-      return returned;
-    }
-
     @Override
     public void close() throws IOException {
       if (connectionHandler != null) {
diff --git a/sdks/java/io/rabbitmq/src/main/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqMessage.java b/sdks/java/io/rabbitmq/src/main/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqMessage.java
index 015d1af..1c24195 100644
--- a/sdks/java/io/rabbitmq/src/main/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqMessage.java
+++ b/sdks/java/io/rabbitmq/src/main/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqMessage.java
@@ -18,7 +18,11 @@
 package org.apache.beam.sdk.io.rabbitmq;
 
 import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.AMQP.BasicProperties;
+import com.rabbitmq.client.Envelope;
+import com.rabbitmq.client.LongString;
 import com.rabbitmq.client.QueueingConsumer;
+import com.rabbitmq.client.QueueingConsumer.Delivery;
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Date;
@@ -33,6 +37,67 @@
  */
 public class RabbitMqMessage implements Serializable {
 
+  /**
+   * Make delivery serializable by cloning all non-serializable values into serializable ones. If it
+   * is not possible, initial delivery is returned and error message is logged
+   *
+   * @param processed
+   * @return
+   */
+  private static Delivery serializableDeliveryOf(Delivery processed) {
+    // All content of envelope is serializable, so no problem there
+    Envelope envelope = processed.getEnvelope();
+    // in basicproperties, there may be LongString, which are *not* serializable
+    BasicProperties properties = processed.getProperties();
+    BasicProperties nextProperties =
+        new BasicProperties.Builder()
+            .appId(properties.getAppId())
+            .clusterId(properties.getClusterId())
+            .contentEncoding(properties.getContentEncoding())
+            .contentType(properties.getContentType())
+            .correlationId(properties.getCorrelationId())
+            .deliveryMode(properties.getDeliveryMode())
+            .expiration(properties.getExpiration())
+            .headers(serializableHeaders(properties.getHeaders()))
+            .messageId(properties.getMessageId())
+            .priority(properties.getPriority())
+            .replyTo(properties.getReplyTo())
+            .timestamp(properties.getTimestamp())
+            .type(properties.getType())
+            .userId(properties.getUserId())
+            .build();
+    return new Delivery(envelope, nextProperties, processed.getBody());
+  }
+
+  private static Map<String, Object> serializableHeaders(Map<String, Object> headers) {
+    Map<String, Object> returned = new HashMap<>();
+    if (headers != null) {
+      for (Map.Entry<String, Object> h : headers.entrySet()) {
+        Object value = h.getValue();
+        if (!(value instanceof Serializable)) {
+          try {
+            if (value instanceof LongString) {
+              LongString longString = (LongString) value;
+              byte[] bytes = longString.getBytes();
+              String s = new String(bytes, "UTF-8");
+              value = s;
+            } else {
+              throw new RuntimeException(String.format("no transformation defined for %s", value));
+            }
+          } catch (Throwable t) {
+            throw new UnsupportedOperationException(
+                String.format(
+                    "can't make unserializable value %s a serializable value (which is mandatory for Apache Beam dataflow implementation)",
+                    value),
+                t);
+          }
+        }
+        returned.put(h.getKey(), value);
+      }
+    }
+    return returned;
+  }
+
   @Nullable private final String routingKey;
   private final byte[] body;
   private final String contentType;
@@ -71,6 +136,7 @@
 
   public RabbitMqMessage(String routingKey, QueueingConsumer.Delivery delivery) {
     this.routingKey = routingKey;
+    delivery = serializableDeliveryOf(delivery);
     body = delivery.getBody();
     contentType = delivery.getProperties().getContentType();
     contentEncoding = delivery.getProperties().getContentEncoding();