According to jbonofre, the transformation of deliverable into serializable objects lies in RabbitMqMessage
And i agree, it's way better.
diff --git a/sdks/java/io/rabbitmq/src/main/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqIO.java b/sdks/java/io/rabbitmq/src/main/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqIO.java
index 6ba12bf..0f0142e 100644
--- a/sdks/java/io/rabbitmq/src/main/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqIO.java
+++ b/sdks/java/io/rabbitmq/src/main/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqIO.java
@@ -20,15 +20,12 @@
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
import com.google.auto.value.AutoValue;
-import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
-import com.rabbitmq.client.Envelope;
-import com.rabbitmq.client.LongString;
import com.rabbitmq.client.MessageProperties;
import com.rabbitmq.client.QueueingConsumer;
-import com.rabbitmq.client.QueueingConsumer.Delivery;
+
import java.io.IOException;
import java.io.Serializable;
import java.net.URISyntaxException;
@@ -36,9 +33,7 @@
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
@@ -459,7 +454,6 @@
if (delivery == null) {
return false;
}
- delivery = serializableDeliveryOf(delivery);
if (source.spec.useCorrelationId()) {
String correlationId = delivery.getProperties().getCorrelationId();
if (correlationId == null) {
@@ -483,68 +477,6 @@
return true;
}
- /**
- * ake delivery serializable by cloning all non-serializable values into serializable ones. If
- * it is not possible, initial delivery is returned and error message is logged
- *
- * @param processed
- * @return
- */
- private Delivery serializableDeliveryOf(Delivery processed) {
- // All content of envelope is serializable, so no problem there
- Envelope envelope = processed.getEnvelope();
- // in basicproperties, there may be LongString, which are *not* serializable
- BasicProperties properties = processed.getProperties();
- BasicProperties nextProperties =
- new BasicProperties.Builder()
- .appId(properties.getAppId())
- .clusterId(properties.getClusterId())
- .contentEncoding(properties.getContentEncoding())
- .contentType(properties.getContentType())
- .correlationId(properties.getCorrelationId())
- .deliveryMode(properties.getDeliveryMode())
- .expiration(properties.getExpiration())
- .headers(serializableHeaders(properties.getHeaders()))
- .messageId(properties.getMessageId())
- .priority(properties.getPriority())
- .replyTo(properties.getReplyTo())
- .timestamp(properties.getTimestamp())
- .type(properties.getType())
- .userId(properties.getUserId())
- .build();
- return new Delivery(envelope, nextProperties, processed.getBody());
- }
-
- private Map<String, Object> serializableHeaders(Map<String, Object> headers) {
- Map<String, Object> returned = new HashMap<>();
- if (headers != null) {
- for (Map.Entry<String, Object> h : headers.entrySet()) {
- Object value = h.getValue();
- if (!(value instanceof Serializable)) {
- try {
- if (value instanceof LongString) {
- LongString longString = (LongString) value;
- byte[] bytes = longString.getBytes();
- String s = new String(bytes, "UTF-8");
- value = s;
- } else {
- throw new RuntimeException(
- String.format("no transformation defined for %s", value));
- }
- } catch (Throwable t) {
- throw new UnsupportedOperationException(
- String.format(
- "can't make unserializable value %s a serializable value (which is mandatory for Apache Beam dataflow implementation)",
- value),
- t);
- }
- }
- returned.put(h.getKey(), value);
- }
- }
- return returned;
- }
-
@Override
public void close() throws IOException {
if (connectionHandler != null) {
diff --git a/sdks/java/io/rabbitmq/src/main/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqMessage.java b/sdks/java/io/rabbitmq/src/main/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqMessage.java
index 015d1af..1c24195 100644
--- a/sdks/java/io/rabbitmq/src/main/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqMessage.java
+++ b/sdks/java/io/rabbitmq/src/main/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqMessage.java
@@ -18,7 +18,11 @@
package org.apache.beam.sdk.io.rabbitmq;
import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.AMQP.BasicProperties;
+import com.rabbitmq.client.Envelope;
+import com.rabbitmq.client.LongString;
import com.rabbitmq.client.QueueingConsumer;
+import com.rabbitmq.client.QueueingConsumer.Delivery;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Date;
@@ -33,6 +37,67 @@
*/
public class RabbitMqMessage implements Serializable {
+ /**
+ * Make delivery serializable by cloning all non-serializable values into serializable ones. If it
+ * is not possible, initial delivery is returned and error message is logged
+ *
+ * @param processed
+ * @return
+ */
+ private static Delivery serializableDeliveryOf(Delivery processed) {
+ // All content of envelope is serializable, so no problem there
+ Envelope envelope = processed.getEnvelope();
+ // in basicproperties, there may be LongString, which are *not* serializable
+ BasicProperties properties = processed.getProperties();
+ BasicProperties nextProperties =
+ new BasicProperties.Builder()
+ .appId(properties.getAppId())
+ .clusterId(properties.getClusterId())
+ .contentEncoding(properties.getContentEncoding())
+ .contentType(properties.getContentType())
+ .correlationId(properties.getCorrelationId())
+ .deliveryMode(properties.getDeliveryMode())
+ .expiration(properties.getExpiration())
+ .headers(serializableHeaders(properties.getHeaders()))
+ .messageId(properties.getMessageId())
+ .priority(properties.getPriority())
+ .replyTo(properties.getReplyTo())
+ .timestamp(properties.getTimestamp())
+ .type(properties.getType())
+ .userId(properties.getUserId())
+ .build();
+ return new Delivery(envelope, nextProperties, processed.getBody());
+ }
+
+ private static Map<String, Object> serializableHeaders(Map<String, Object> headers) {
+ Map<String, Object> returned = new HashMap<>();
+ if (headers != null) {
+ for (Map.Entry<String, Object> h : headers.entrySet()) {
+ Object value = h.getValue();
+ if (!(value instanceof Serializable)) {
+ try {
+ if (value instanceof LongString) {
+ LongString longString = (LongString) value;
+ byte[] bytes = longString.getBytes();
+ String s = new String(bytes, "UTF-8");
+ value = s;
+ } else {
+ throw new RuntimeException(String.format("no transformation defined for %s", value));
+ }
+ } catch (Throwable t) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "can't make unserializable value %s a serializable value (which is mandatory for Apache Beam dataflow implementation)",
+ value),
+ t);
+ }
+ }
+ returned.put(h.getKey(), value);
+ }
+ }
+ return returned;
+ }
+
@Nullable private final String routingKey;
private final byte[] body;
private final String contentType;
@@ -71,6 +136,7 @@
public RabbitMqMessage(String routingKey, QueueingConsumer.Delivery delivery) {
this.routingKey = routingKey;
+ delivery = serializableDeliveryOf(delivery);
body = delivery.getBody();
contentType = delivery.getProperties().getContentType();
contentEncoding = delivery.getProperties().getContentEncoding();