Merge pull request #8677 from Riduidel/fix/rabbitmq-message-not-serializable
[BEAM-7414] fix for message being not serializable due to LongString in headers
diff --git a/sdks/java/io/rabbitmq/src/main/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqIO.java b/sdks/java/io/rabbitmq/src/main/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqIO.java
index 4a91035..fef40d2 100644
--- a/sdks/java/io/rabbitmq/src/main/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqIO.java
+++ b/sdks/java/io/rabbitmq/src/main/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqIO.java
@@ -71,8 +71,8 @@
* instead of directly from a queue:
*
* <pre>{@code
- * PCollection<RabbitMqMessage> messages = pipeline.apply(
- * RabbitMqIO.read().withUri("amqp://user:password@localhost:5672").withExchange("EXCHANGE", "fanout", "QUEUE"));
+ * PCollection<RabbitMqMessage> messages = pipeline.apply(RabbitMqIO.read()
+ * .withUri("amqp://user:password@localhost:5672").withExchange("EXCHANGE", "fanout", "QUEUE"));
* }</pre>
*
* <h3>Publishing messages to RabbitMQ server</h3>
diff --git a/sdks/java/io/rabbitmq/src/main/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqMessage.java b/sdks/java/io/rabbitmq/src/main/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqMessage.java
index 015d1af..1c24195 100644
--- a/sdks/java/io/rabbitmq/src/main/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqMessage.java
+++ b/sdks/java/io/rabbitmq/src/main/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqMessage.java
@@ -18,7 +18,11 @@
package org.apache.beam.sdk.io.rabbitmq;
import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.AMQP.BasicProperties;
+import com.rabbitmq.client.Envelope;
+import com.rabbitmq.client.LongString;
import com.rabbitmq.client.QueueingConsumer;
+import com.rabbitmq.client.QueueingConsumer.Delivery;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Date;
@@ -33,6 +37,67 @@
*/
public class RabbitMqMessage implements Serializable {
+ /**
+ * Make delivery serializable by cloning all non-serializable values into serializable ones. If it
+ * is not possible, initial delivery is returned and error message is logged
+ *
+ * @param processed
+ * @return
+ */
+ private static Delivery serializableDeliveryOf(Delivery processed) {
+ // All content of envelope is serializable, so no problem there
+ Envelope envelope = processed.getEnvelope();
+ // in basicproperties, there may be LongString, which are *not* serializable
+ BasicProperties properties = processed.getProperties();
+ BasicProperties nextProperties =
+ new BasicProperties.Builder()
+ .appId(properties.getAppId())
+ .clusterId(properties.getClusterId())
+ .contentEncoding(properties.getContentEncoding())
+ .contentType(properties.getContentType())
+ .correlationId(properties.getCorrelationId())
+ .deliveryMode(properties.getDeliveryMode())
+ .expiration(properties.getExpiration())
+ .headers(serializableHeaders(properties.getHeaders()))
+ .messageId(properties.getMessageId())
+ .priority(properties.getPriority())
+ .replyTo(properties.getReplyTo())
+ .timestamp(properties.getTimestamp())
+ .type(properties.getType())
+ .userId(properties.getUserId())
+ .build();
+ return new Delivery(envelope, nextProperties, processed.getBody());
+ }
+
+ private static Map<String, Object> serializableHeaders(Map<String, Object> headers) {
+ Map<String, Object> returned = new HashMap<>();
+ if (headers != null) {
+ for (Map.Entry<String, Object> h : headers.entrySet()) {
+ Object value = h.getValue();
+ if (!(value instanceof Serializable)) {
+ try {
+ if (value instanceof LongString) {
+ LongString longString = (LongString) value;
+ byte[] bytes = longString.getBytes();
+ String s = new String(bytes, "UTF-8");
+ value = s;
+ } else {
+ throw new RuntimeException(String.format("no transformation defined for %s", value));
+ }
+ } catch (Throwable t) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "can't make unserializable value %s a serializable value (which is mandatory for Apache Beam dataflow implementation)",
+ value),
+ t);
+ }
+ }
+ returned.put(h.getKey(), value);
+ }
+ }
+ return returned;
+ }
+
@Nullable private final String routingKey;
private final byte[] body;
private final String contentType;
@@ -71,6 +136,7 @@
public RabbitMqMessage(String routingKey, QueueingConsumer.Delivery delivery) {
this.routingKey = routingKey;
+ delivery = serializableDeliveryOf(delivery);
body = delivery.getBody();
contentType = delivery.getProperties().getContentType();
contentEncoding = delivery.getProperties().getContentEncoding();