Merge pull request #8677 from Riduidel/fix/rabbitmq-message-not-serializable

[BEAM-7414] fix for message being not serializable due to LongString in headers
diff --git a/sdks/java/io/rabbitmq/src/main/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqIO.java b/sdks/java/io/rabbitmq/src/main/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqIO.java
index 4a91035..fef40d2 100644
--- a/sdks/java/io/rabbitmq/src/main/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqIO.java
+++ b/sdks/java/io/rabbitmq/src/main/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqIO.java
@@ -71,8 +71,8 @@
  * instead of directly from a queue:
  *
  * <pre>{@code
- * PCollection<RabbitMqMessage> messages = pipeline.apply(
- *   RabbitMqIO.read().withUri("amqp://user:password@localhost:5672").withExchange("EXCHANGE", "fanout", "QUEUE"));
+ * 	PCollection<RabbitMqMessage> messages = pipeline.apply(RabbitMqIO.read()
+ * 			.withUri("amqp://user:password@localhost:5672").withExchange("EXCHANGE", "fanout", "QUEUE"));
  * }</pre>
  *
  * <h3>Publishing messages to RabbitMQ server</h3>
diff --git a/sdks/java/io/rabbitmq/src/main/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqMessage.java b/sdks/java/io/rabbitmq/src/main/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqMessage.java
index 015d1af..1c24195 100644
--- a/sdks/java/io/rabbitmq/src/main/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqMessage.java
+++ b/sdks/java/io/rabbitmq/src/main/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqMessage.java
@@ -18,7 +18,11 @@
 package org.apache.beam.sdk.io.rabbitmq;
 
 import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.AMQP.BasicProperties;
+import com.rabbitmq.client.Envelope;
+import com.rabbitmq.client.LongString;
 import com.rabbitmq.client.QueueingConsumer;
+import com.rabbitmq.client.QueueingConsumer.Delivery;
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Date;
@@ -33,6 +37,67 @@
  */
 public class RabbitMqMessage implements Serializable {
 
+  /**
+   * Make delivery serializable by cloning all non-serializable values into serializable ones. If it
+   * is not possible, initial delivery is returned and error message is logged
+   *
+   * @param processed
+   * @return
+   */
+  private static Delivery serializableDeliveryOf(Delivery processed) {
+    // All content of envelope is serializable, so no problem there
+    Envelope envelope = processed.getEnvelope();
+    // in basicproperties, there may be LongString, which are *not* serializable
+    BasicProperties properties = processed.getProperties();
+    BasicProperties nextProperties =
+        new BasicProperties.Builder()
+            .appId(properties.getAppId())
+            .clusterId(properties.getClusterId())
+            .contentEncoding(properties.getContentEncoding())
+            .contentType(properties.getContentType())
+            .correlationId(properties.getCorrelationId())
+            .deliveryMode(properties.getDeliveryMode())
+            .expiration(properties.getExpiration())
+            .headers(serializableHeaders(properties.getHeaders()))
+            .messageId(properties.getMessageId())
+            .priority(properties.getPriority())
+            .replyTo(properties.getReplyTo())
+            .timestamp(properties.getTimestamp())
+            .type(properties.getType())
+            .userId(properties.getUserId())
+            .build();
+    return new Delivery(envelope, nextProperties, processed.getBody());
+  }
+
+  private static Map<String, Object> serializableHeaders(Map<String, Object> headers) {
+    Map<String, Object> returned = new HashMap<>();
+    if (headers != null) {
+      for (Map.Entry<String, Object> h : headers.entrySet()) {
+        Object value = h.getValue();
+        if (!(value instanceof Serializable)) {
+          try {
+            if (value instanceof LongString) {
+              LongString longString = (LongString) value;
+              byte[] bytes = longString.getBytes();
+              String s = new String(bytes, "UTF-8");
+              value = s;
+            } else {
+              throw new RuntimeException(String.format("no transformation defined for %s", value));
+            }
+          } catch (Throwable t) {
+            throw new UnsupportedOperationException(
+                String.format(
+                    "can't make unserializable value %s a serializable value (which is mandatory for Apache Beam dataflow implementation)",
+                    value),
+                t);
+          }
+        }
+        returned.put(h.getKey(), value);
+      }
+    }
+    return returned;
+  }
+
   @Nullable private final String routingKey;
   private final byte[] body;
   private final String contentType;
@@ -71,6 +136,7 @@
 
   public RabbitMqMessage(String routingKey, QueueingConsumer.Delivery delivery) {
     this.routingKey = routingKey;
+    delivery = serializableDeliveryOf(delivery);
     body = delivery.getBody();
     contentType = delivery.getProperties().getContentType();
     contentEncoding = delivery.getProperties().getContentEncoding();