CAMEL-20444 - Camel-Azure-Servicebus: Support setting of CorrelationId on producer
Signed-off-by: Andrea Cosentino <ancosen@gmail.com>
diff --git a/components/camel-azure/camel-azure-servicebus/src/generated/resources/org/apache/camel/component/azure/servicebus/azure-servicebus.json b/components/camel-azure/camel-azure-servicebus/src/generated/resources/org/apache/camel/component/azure/servicebus/azure-servicebus.json
index 4d9bf60..a74ccb7 100644
--- a/components/camel-azure/camel-azure-servicebus/src/generated/resources/org/apache/camel/component/azure/servicebus/azure-servicebus.json
+++ b/components/camel-azure/camel-azure-servicebus/src/generated/resources/org/apache/camel/component/azure/servicebus/azure-servicebus.json
@@ -55,29 +55,29 @@
"headers": {
"CamelAzureServiceBusApplicationProperties": { "index": 0, "kind": "header", "displayName": "", "group": "common", "label": "common", "required": false, "javaType": "Map<String, Object>", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The application properties (also known as custom properties) on messages sent and received by the producer and consumer, respectively.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#APPLICATION_PROPERTIES" },
"CamelAzureServiceBusContentType": { "index": 1, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the content type of the message.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#CONTENT_TYPE" },
- "CamelAzureServiceBusCorrelationId": { "index": 2, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets a correlation identifier.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#CORRELATION_ID" },
- "CamelAzureServiceBusDeadLetterErrorDescription": { "index": 3, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the description for a message that has been dead-lettered.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#DEAD_LETTER_ERROR_DESCRIPTION" },
- "CamelAzureServiceBusDeadLetterReason": { "index": 4, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the reason a message was dead-lettered.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#DEAD_LETTER_REASON" },
- "CamelAzureServiceBusDeadLetterSource": { "index": 5, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the name of the queue or subscription that this message was enqueued on, before it was dead-lettered.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#DEAD_LETTER_SOURCE" },
- "CamelAzureServiceBusDeliveryCount": { "index": 6, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the number of the times this message was delivered to clients.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#DELIVERY_COUNT" },
- "CamelAzureServiceBusEnqueuedSequenceNumber": { "index": 7, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the enqueued sequence number assigned to a message by Service Bus.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#ENQUEUED_SEQUENCE_NUMBER" },
- "CamelAzureServiceBusEnqueuedTime": { "index": 8, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "OffsetDateTime", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the datetime at which this message was enqueued in Azure Service Bus.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#ENQUEUED_TIME" },
- "CamelAzureServiceBusExpiresAt": { "index": 9, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "OffsetDateTime", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the datetime at which this message will expire.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#EXPIRES_AT" },
- "CamelAzureServiceBusLockToken": { "index": 10, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the lock token for the current message.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#LOCK_TOKEN" },
- "CamelAzureServiceBusLockedUntil": { "index": 11, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "OffsetDateTime", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the datetime at which the lock of this message expires.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#LOCKED_UNTIL" },
- "CamelAzureServiceBusMessageId": { "index": 12, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the identifier for the message.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#MESSAGE_ID" },
- "CamelAzureServiceBusPartitionKey": { "index": 13, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the partition key for sending a message to a partitioned entity.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#PARTITION_KEY" },
- "CamelAzureServiceBusRawAmqpMessage": { "index": 14, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "AmqpAnnotatedMessage", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The representation of message as defined by AMQP protocol.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#RAW_AMQP_MESSAGE" },
- "CamelAzureServiceBusReplyTo": { "index": 15, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the address of an entity to send replies to.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#REPLY_TO" },
- "CamelAzureServiceBusReplyToSessionId": { "index": 16, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets or sets a session identifier augmenting the ReplyTo address.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#REPLY_TO_SESSION_ID" },
- "CamelAzureServiceBusSequenceNumber": { "index": 17, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the unique number assigned to a message by Service Bus.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#SEQUENCE_NUMBER" },
- "CamelAzureServiceBusSessionId": { "index": 18, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the session id of the message.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#SESSION_ID" },
- "CamelAzureServiceBusSubject": { "index": 19, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the subject for the message.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#SUBJECT" },
- "CamelAzureServiceBusTimeToLive": { "index": 20, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Duration", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the duration before this message expires.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#TIME_TO_LIVE" },
- "CamelAzureServiceBusTo": { "index": 21, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the to address.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#TO" },
- "CamelAzureServiceBusScheduledEnqueueTime": { "index": 22, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "OffsetDateTime", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "(producer)Overrides the OffsetDateTime at which the message should appear in the Service Bus queue or topic. (consumer) Gets the scheduled enqueue time of this message.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#SCHEDULED_ENQUEUE_TIME" },
- "CamelAzureServiceBusServiceBusTransactionContext": { "index": 23, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "ServiceBusTransactionContext", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Overrides the transaction in service. This object just contains transaction id.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#SERVICE_BUS_TRANSACTION_CONTEXT" },
- "CamelAzureServiceBusProducerOperation": { "index": 24, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "org.apache.camel.component.azure.servicebus.ServiceBusProducerOperationDefinition", "enum": [ "sendMessages", "scheduleMessages" ], "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Overrides the desired operation to be used in the producer.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#PRODUCER_OPERATION" }
+ "CamelAzureServiceBusDeadLetterErrorDescription": { "index": 2, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the description for a message that has been dead-lettered.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#DEAD_LETTER_ERROR_DESCRIPTION" },
+ "CamelAzureServiceBusDeadLetterReason": { "index": 3, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the reason a message was dead-lettered.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#DEAD_LETTER_REASON" },
+ "CamelAzureServiceBusDeadLetterSource": { "index": 4, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the name of the queue or subscription that this message was enqueued on, before it was dead-lettered.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#DEAD_LETTER_SOURCE" },
+ "CamelAzureServiceBusDeliveryCount": { "index": 5, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the number of the times this message was delivered to clients.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#DELIVERY_COUNT" },
+ "CamelAzureServiceBusEnqueuedSequenceNumber": { "index": 6, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the enqueued sequence number assigned to a message by Service Bus.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#ENQUEUED_SEQUENCE_NUMBER" },
+ "CamelAzureServiceBusEnqueuedTime": { "index": 7, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "OffsetDateTime", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the datetime at which this message was enqueued in Azure Service Bus.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#ENQUEUED_TIME" },
+ "CamelAzureServiceBusExpiresAt": { "index": 8, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "OffsetDateTime", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the datetime at which this message will expire.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#EXPIRES_AT" },
+ "CamelAzureServiceBusLockToken": { "index": 9, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the lock token for the current message.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#LOCK_TOKEN" },
+ "CamelAzureServiceBusLockedUntil": { "index": 10, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "OffsetDateTime", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the datetime at which the lock of this message expires.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#LOCKED_UNTIL" },
+ "CamelAzureServiceBusMessageId": { "index": 11, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the identifier for the message.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#MESSAGE_ID" },
+ "CamelAzureServiceBusPartitionKey": { "index": 12, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the partition key for sending a message to a partitioned entity.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#PARTITION_KEY" },
+ "CamelAzureServiceBusRawAmqpMessage": { "index": 13, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "AmqpAnnotatedMessage", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The representation of message as defined by AMQP protocol.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#RAW_AMQP_MESSAGE" },
+ "CamelAzureServiceBusReplyTo": { "index": 14, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the address of an entity to send replies to.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#REPLY_TO" },
+ "CamelAzureServiceBusReplyToSessionId": { "index": 15, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets or sets a session identifier augmenting the ReplyTo address.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#REPLY_TO_SESSION_ID" },
+ "CamelAzureServiceBusSequenceNumber": { "index": 16, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the unique number assigned to a message by Service Bus.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#SEQUENCE_NUMBER" },
+ "CamelAzureServiceBusSessionId": { "index": 17, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the session id of the message.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#SESSION_ID" },
+ "CamelAzureServiceBusSubject": { "index": 18, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the subject for the message.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#SUBJECT" },
+ "CamelAzureServiceBusTimeToLive": { "index": 19, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Duration", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the duration before this message expires.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#TIME_TO_LIVE" },
+ "CamelAzureServiceBusTo": { "index": 20, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the to address.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#TO" },
+ "CamelAzureServiceBusScheduledEnqueueTime": { "index": 21, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "OffsetDateTime", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "(producer)Overrides the OffsetDateTime at which the message should appear in the Service Bus queue or topic. (consumer) Gets the scheduled enqueue time of this message.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#SCHEDULED_ENQUEUE_TIME" },
+ "CamelAzureServiceBusServiceBusTransactionContext": { "index": 22, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "ServiceBusTransactionContext", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Overrides the transaction in service. This object just contains transaction id.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#SERVICE_BUS_TRANSACTION_CONTEXT" },
+ "CamelAzureServiceBusProducerOperation": { "index": 23, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "org.apache.camel.component.azure.servicebus.ServiceBusProducerOperationDefinition", "enum": [ "sendMessages", "scheduleMessages" ], "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Overrides the desired operation to be used in the producer.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#PRODUCER_OPERATION" },
+ "CamelAzureServiceBusCorrelationId": { "index": 24, "kind": "header", "displayName": "", "group": "common", "label": "common", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets or Sets a correlation identifier.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#CORRELATION_ID" }
},
"properties": {
"topicOrQueueName": { "index": 0, "kind": "path", "displayName": "Topic Or Queue Name", "group": "common", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "Selected topic name or the queue name, that is depending on serviceBusType config. For example if serviceBusType=queue, then this will be the queue name and if serviceBusType=topic, this will be the topic name." },
diff --git a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConstants.java b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConstants.java
index cde4ff5..5ce1b5a 100644
--- a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConstants.java
+++ b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConstants.java
@@ -29,8 +29,6 @@
public static final String APPLICATION_PROPERTIES = HEADER_PREFIX + "ApplicationProperties";
@Metadata(label = "consumer", description = "Gets the content type of the message.", javaType = "String")
public static final String CONTENT_TYPE = HEADER_PREFIX + "ContentType";
- @Metadata(label = "consumer", description = "Gets a correlation identifier.", javaType = "String")
- public static final String CORRELATION_ID = HEADER_PREFIX + "CorrelationId";
@Metadata(label = "consumer", description = "Gets the description for a message that has been dead-lettered.",
javaType = "String")
public static final String DEAD_LETTER_ERROR_DESCRIPTION = HEADER_PREFIX + "DeadLetterErrorDescription";
@@ -98,6 +96,10 @@
javaType = "org.apache.camel.component.azure.servicebus.ServiceBusProducerOperationDefinition")
public static final String PRODUCER_OPERATION = HEADER_PREFIX + "ProducerOperation";
+ // headers evaluated by the producer and consumer
+ @Metadata(label = "common", description = "Gets or Sets a correlation identifier.", javaType = "String")
+ public static final String CORRELATION_ID = HEADER_PREFIX + "CorrelationId";
+
private ServiceBusConstants() {
}
}
diff --git a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusProducer.java b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusProducer.java
index e6300d3..b390a6a 100644
--- a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusProducer.java
+++ b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusProducer.java
@@ -149,20 +149,23 @@
final Object inputBody = exchange.getMessage().getBody();
final Map<String, Object> applicationProperties
= exchange.getMessage().getHeader(ServiceBusConstants.APPLICATION_PROPERTIES, Map.class);
+ final String correlationId = exchange.getMessage().getHeader(ServiceBusConstants.CORRELATION_ID, String.class);
Mono<Void> sendMessageAsync;
if (inputBody instanceof Iterable<?>) {
sendMessageAsync
= serviceBusSenderOperations.sendMessages(convertBodyToList((Iterable<?>) inputBody),
- configurationOptionsProxy.getServiceBusTransactionContext(exchange), applicationProperties);
+ configurationOptionsProxy.getServiceBusTransactionContext(exchange), applicationProperties,
+ correlationId);
} else {
Object convertedBody = inputBody instanceof BinaryData ? inputBody
: getConfiguration().isBinary() ? convertBodyToBinary(exchange)
: exchange.getMessage().getBody(String.class);
sendMessageAsync = serviceBusSenderOperations.sendMessages(convertedBody,
- configurationOptionsProxy.getServiceBusTransactionContext(exchange), applicationProperties);
+ configurationOptionsProxy.getServiceBusTransactionContext(exchange), applicationProperties,
+ correlationId);
}
subscribeToMono(sendMessageAsync, exchange, noop -> {
@@ -176,6 +179,7 @@
final Object inputBody = exchange.getMessage().getBody();
final Map<String, Object> applicationProperties
= exchange.getMessage().getHeader(ServiceBusConstants.APPLICATION_PROPERTIES, Map.class);
+ final String correlationId = exchange.getMessage().getHeader(ServiceBusConstants.CORRELATION_ID, String.class);
Mono<List<Long>> scheduleMessagesAsync;
@@ -184,7 +188,8 @@
= serviceBusSenderOperations.scheduleMessages(convertBodyToList((Iterable<?>) inputBody),
configurationOptionsProxy.getScheduledEnqueueTime(exchange),
configurationOptionsProxy.getServiceBusTransactionContext(exchange),
- applicationProperties);
+ applicationProperties,
+ correlationId);
} else {
Object convertedBody = inputBody instanceof BinaryData ? inputBody
: getConfiguration().isBinary() ? convertBodyToBinary(exchange)
@@ -193,7 +198,8 @@
= serviceBusSenderOperations.scheduleMessages(convertedBody,
configurationOptionsProxy.getScheduledEnqueueTime(exchange),
configurationOptionsProxy.getServiceBusTransactionContext(exchange),
- applicationProperties);
+ applicationProperties,
+ correlationId);
}
subscribeToMono(scheduleMessagesAsync, exchange,
diff --git a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusUtils.java b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusUtils.java
index 4f26dbd..120b8c5 100644
--- a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusUtils.java
+++ b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusUtils.java
@@ -22,6 +22,7 @@
import com.azure.core.util.BinaryData;
import com.azure.messaging.servicebus.ServiceBusMessage;
+import org.apache.camel.util.ObjectHelper;
public final class ServiceBusUtils {
@@ -29,7 +30,7 @@
}
public static ServiceBusMessage createServiceBusMessage(
- final Object data, final Map<String, Object> applicationProperties) {
+ final Object data, final Map<String, Object> applicationProperties, final String correlationId) {
ServiceBusMessage serviceBusMessage;
if (data instanceof String) {
serviceBusMessage = new ServiceBusMessage((String) data);
@@ -43,13 +44,16 @@
if (applicationProperties != null) {
serviceBusMessage.getRawAmqpMessage().getApplicationProperties().putAll(applicationProperties);
}
+ if (ObjectHelper.isNotEmpty(correlationId)) {
+ serviceBusMessage.setCorrelationId(correlationId);
+ }
return serviceBusMessage;
}
public static Iterable<ServiceBusMessage> createServiceBusMessages(
- final Iterable<?> data, final Map<String, Object> applicationProperties) {
+ final Iterable<?> data, final Map<String, Object> applicationProperties, final String correlationId) {
return StreamSupport.stream(data.spliterator(), false)
- .map(obj -> createServiceBusMessage(obj, applicationProperties))
+ .map(obj -> createServiceBusMessage(obj, applicationProperties, correlationId))
.collect(Collectors.toList());
}
}
diff --git a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/operations/ServiceBusSenderOperations.java b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/operations/ServiceBusSenderOperations.java
index 6121682..c299e9c 100644
--- a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/operations/ServiceBusSenderOperations.java
+++ b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/operations/ServiceBusSenderOperations.java
@@ -41,35 +41,39 @@
public Mono<Void> sendMessages(
final Object data,
final ServiceBusTransactionContext context,
- final Map<String, Object> applicationProperties) {
+ final Map<String, Object> applicationProperties,
+ final String correlationId) {
if (data instanceof Iterable<?>) {
- return sendMessages((Iterable<?>) data, context, applicationProperties);
+ return sendMessages((Iterable<?>) data, context, applicationProperties, correlationId);
}
- return sendMessage(data, context, applicationProperties);
+ return sendMessage(data, context, applicationProperties, correlationId);
}
public Mono<List<Long>> scheduleMessages(
final Object data,
final OffsetDateTime scheduledEnqueueTime,
final ServiceBusTransactionContext context,
- final Map<String, Object> applicationProperties) {
+ final Map<String, Object> applicationProperties,
+ final String correlationId) {
if (ObjectHelper.isEmpty(scheduledEnqueueTime)) {
throw new IllegalArgumentException("To schedule a message, you need to set scheduledEnqueueTime.");
}
if (data instanceof Iterable<?>) {
- return scheduleMessages((Iterable<?>) data, scheduledEnqueueTime, context, applicationProperties);
+ return scheduleMessages((Iterable<?>) data, scheduledEnqueueTime, context, applicationProperties, correlationId);
}
- return scheduleMessage(data, scheduledEnqueueTime, context, applicationProperties);
+ return scheduleMessage(data, scheduledEnqueueTime, context, applicationProperties, correlationId);
}
private Mono<Void> sendMessages(
final Iterable<?> data,
final ServiceBusTransactionContext context,
- final Map<String, Object> applicationProperties) {
- final Iterable<ServiceBusMessage> messages = ServiceBusUtils.createServiceBusMessages(data, applicationProperties);
+ final Map<String, Object> applicationProperties,
+ final String correlationId) {
+ final Iterable<ServiceBusMessage> messages
+ = ServiceBusUtils.createServiceBusMessages(data, applicationProperties, correlationId);
if (ObjectHelper.isEmpty(context)) {
return client.sendMessages(messages);
@@ -81,8 +85,9 @@
private Mono<Void> sendMessage(
final Object data,
final ServiceBusTransactionContext context,
- final Map<String, Object> applicationProperties) {
- final ServiceBusMessage message = ServiceBusUtils.createServiceBusMessage(data, applicationProperties);
+ final Map<String, Object> applicationProperties,
+ final String correlationId) {
+ final ServiceBusMessage message = ServiceBusUtils.createServiceBusMessage(data, applicationProperties, correlationId);
if (ObjectHelper.isEmpty(context)) {
return client.sendMessage(message);
@@ -95,8 +100,9 @@
final Object data,
final OffsetDateTime scheduledEnqueueTime,
final ServiceBusTransactionContext context,
- final Map<String, Object> applicationProperties) {
- final ServiceBusMessage message = ServiceBusUtils.createServiceBusMessage(data, applicationProperties);
+ final Map<String, Object> applicationProperties,
+ final String correlationId) {
+ final ServiceBusMessage message = ServiceBusUtils.createServiceBusMessage(data, applicationProperties, correlationId);
if (ObjectHelper.isEmpty(context)) {
return client.scheduleMessage(message, scheduledEnqueueTime)
@@ -110,8 +116,10 @@
private Mono<List<Long>> scheduleMessages(
final Iterable<?> data, final OffsetDateTime scheduledEnqueueTime,
final ServiceBusTransactionContext context,
- final Map<String, Object> applicationProperties) {
- final Iterable<ServiceBusMessage> messages = ServiceBusUtils.createServiceBusMessages(data, applicationProperties);
+ final Map<String, Object> applicationProperties,
+ final String correlationId) {
+ final Iterable<ServiceBusMessage> messages
+ = ServiceBusUtils.createServiceBusMessages(data, applicationProperties, correlationId);
if (ObjectHelper.isEmpty(context)) {
return client.scheduleMessages(messages, scheduledEnqueueTime)
diff --git a/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusUtilsTest.java b/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusUtilsTest.java
index f952bc8..33951cf 100644
--- a/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusUtilsTest.java
+++ b/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusUtilsTest.java
@@ -32,18 +32,18 @@
@Test
void testCreateServiceBusMessage() {
// test string
- final ServiceBusMessage message1 = ServiceBusUtils.createServiceBusMessage("test string", null);
+ final ServiceBusMessage message1 = ServiceBusUtils.createServiceBusMessage("test string", null, null);
assertEquals("test string", message1.getBody().toString());
// test int
- final ServiceBusMessage message2 = ServiceBusUtils.createServiceBusMessage(String.valueOf(12345), null);
+ final ServiceBusMessage message2 = ServiceBusUtils.createServiceBusMessage(String.valueOf(12345), null, null);
assertEquals("12345", message2.getBody().toString());
//test bytes
byte[] testByteBody = "test string".getBytes(StandardCharsets.UTF_8);
- final ServiceBusMessage message3 = ServiceBusUtils.createServiceBusMessage(testByteBody, null);
+ final ServiceBusMessage message3 = ServiceBusUtils.createServiceBusMessage(testByteBody, null, null);
assertArrayEquals(testByteBody, message3.getBody().toBytes());
}
@@ -53,7 +53,7 @@
inputMessages.add("test data");
inputMessages.add(String.valueOf(12345));
- final Iterable<ServiceBusMessage> busMessages = ServiceBusUtils.createServiceBusMessages(inputMessages, null);
+ final Iterable<ServiceBusMessage> busMessages = ServiceBusUtils.createServiceBusMessages(inputMessages, null, null);
assertTrue(StreamSupport.stream(busMessages.spliterator(), false)
.anyMatch(record -> record.getBody().toString().equals("test data")));
@@ -67,7 +67,7 @@
inputMessages2.add(byteBody1);
inputMessages2.add(byteBody2);
- final Iterable<ServiceBusMessage> busMessages2 = ServiceBusUtils.createServiceBusMessages(inputMessages2, null);
+ final Iterable<ServiceBusMessage> busMessages2 = ServiceBusUtils.createServiceBusMessages(inputMessages2, null, null);
assertTrue(StreamSupport.stream(busMessages2.spliterator(), false)
.anyMatch(message -> Arrays.equals(message.getBody().toBytes(), byteBody1)));
diff --git a/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/integration/ServiceBusConsumerTest.java b/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/integration/ServiceBusConsumerTest.java
index dbb34aa..9ff5ebb 100644
--- a/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/integration/ServiceBusConsumerTest.java
+++ b/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/integration/ServiceBusConsumerTest.java
@@ -53,7 +53,7 @@
inputBatch.add("test batch 3");
new ServiceBusSenderOperations(new ServiceBusSenderAsyncClientWrapper(senderAsyncClient))
- .sendMessages(inputBatch, null, null)
+ .sendMessages(inputBatch, null, null, null)
.block();
// test the data now
@@ -83,7 +83,7 @@
inputBatch.add("peek test batch 3");
new ServiceBusSenderOperations(new ServiceBusSenderAsyncClientWrapper(senderAsyncClient))
- .sendMessages(inputBatch, null, null)
+ .sendMessages(inputBatch, null, null, null)
.block();
// test the data now
diff --git a/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/integration/operations/ServiceBusSenderOperationsTest.java b/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/integration/operations/ServiceBusSenderOperationsTest.java
index f8a5354..b131609 100644
--- a/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/integration/operations/ServiceBusSenderOperationsTest.java
+++ b/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/integration/operations/ServiceBusSenderOperationsTest.java
@@ -88,7 +88,7 @@
void testSendSingleMessage() {
final ServiceBusSenderOperations operations = new ServiceBusSenderOperations(clientSenderWrapper);
- operations.sendMessages("test data", null, Map.of("customKey", "customValue")).block();
+ operations.sendMessages("test data", null, Map.of("customKey", "customValue"), null).block();
final boolean exists = StreamSupport.stream(clientReceiverWrapper.receiveMessages().toIterable().spliterator(), false)
.anyMatch(serviceBusReceivedMessage -> serviceBusReceivedMessage.getBody().toString().equals("test data"));
@@ -97,7 +97,7 @@
//test bytes
byte[] testByteBody = "test data".getBytes(StandardCharsets.UTF_8);
- operations.sendMessages(testByteBody, null, Map.of("customKey", "customValue")).block();
+ operations.sendMessages(testByteBody, null, Map.of("customKey", "customValue"), null).block();
final boolean exists2 = StreamSupport.stream(clientReceiverWrapper.receiveMessages().toIterable().spliterator(), false)
.anyMatch(serviceBusReceivedMessage -> Arrays.equals(serviceBusReceivedMessage.getBody().toBytes(),
testByteBody));
@@ -105,7 +105,7 @@
// test if we have something other than string or byte[]
assertThrows(IllegalArgumentException.class, () -> {
- operations.sendMessages(12345, null, null).block();
+ operations.sendMessages(12345, null, null, null).block();
});
}
@@ -118,7 +118,7 @@
inputBatch.add("test batch 2");
inputBatch.add("test batch 3");
- operations.sendMessages(inputBatch, null, null).block();
+ operations.sendMessages(inputBatch, null, null, null).block();
final Spliterator<ServiceBusReceivedMessage> receivedMessages
= clientReceiverWrapper.receiveMessages().toIterable().spliterator();
@@ -143,7 +143,7 @@
inputBatch2.add(byteBody1);
inputBatch2.add(byteBody2);
- operations.sendMessages(inputBatch2, null, null).block();
+ operations.sendMessages(inputBatch2, null, null, null).block();
final Spliterator<ServiceBusReceivedMessage> receivedMessages2
= clientReceiverWrapper.receiveMessages().toIterable().spliterator();
@@ -161,7 +161,7 @@
void testScheduleMessage() {
final ServiceBusSenderOperations operations = new ServiceBusSenderOperations(clientSenderWrapper);
- operations.scheduleMessages("testScheduleMessage", OffsetDateTime.now(), null, null).block();
+ operations.scheduleMessages("testScheduleMessage", OffsetDateTime.now(), null, null, null).block();
final boolean exists = StreamSupport.stream(clientReceiverWrapper.receiveMessages().toIterable().spliterator(), false)
.anyMatch(serviceBusReceivedMessage -> serviceBusReceivedMessage.getBody().toString()
@@ -171,7 +171,7 @@
//test bytes
byte[] testByteBody = "test data".getBytes(StandardCharsets.UTF_8);
- operations.scheduleMessages(testByteBody, OffsetDateTime.now(), null, null).block();
+ operations.scheduleMessages(testByteBody, OffsetDateTime.now(), null, null, null).block();
final boolean exists2 = StreamSupport.stream(clientReceiverWrapper.receiveMessages().toIterable().spliterator(), false)
.anyMatch(serviceBusReceivedMessage -> Arrays.equals(serviceBusReceivedMessage.getBody().toBytes(),
testByteBody));
@@ -179,7 +179,7 @@
// test if we have something other than string or byte[]
assertThrows(IllegalArgumentException.class, () -> {
- operations.scheduleMessages(12345, OffsetDateTime.now(), null, null).block();
+ operations.scheduleMessages(12345, OffsetDateTime.now(), null, null, null).block();
});
}
@@ -192,7 +192,7 @@
inputBatch.add("testSchedulingBatchMessages 2");
inputBatch.add("testSchedulingBatchMessages 3");
- operations.scheduleMessages(inputBatch, OffsetDateTime.now(), null, null).block();
+ operations.scheduleMessages(inputBatch, OffsetDateTime.now(), null, null, null).block();
final Spliterator<ServiceBusReceivedMessage> receivedMessages
= clientReceiverWrapper.receiveMessages().toIterable().spliterator();
@@ -220,7 +220,7 @@
inputBatch2.add(byteBody1);
inputBatch2.add(byteBody2);
- operations.scheduleMessages(inputBatch2, OffsetDateTime.now(), null, null).block();
+ operations.scheduleMessages(inputBatch2, OffsetDateTime.now(), null, null, null).block();
final Spliterator<ServiceBusReceivedMessage> receivedMessages2
= clientReceiverWrapper.receiveMessages().toIterable().spliterator();