CAMEL-20444 - Camel-Azure-Servicebus: Support setting of CorrelationId on producer

Signed-off-by: Andrea Cosentino <ancosen@gmail.com>
diff --git a/components/camel-azure/camel-azure-servicebus/src/generated/resources/org/apache/camel/component/azure/servicebus/azure-servicebus.json b/components/camel-azure/camel-azure-servicebus/src/generated/resources/org/apache/camel/component/azure/servicebus/azure-servicebus.json
index 4d9bf60..a74ccb7 100644
--- a/components/camel-azure/camel-azure-servicebus/src/generated/resources/org/apache/camel/component/azure/servicebus/azure-servicebus.json
+++ b/components/camel-azure/camel-azure-servicebus/src/generated/resources/org/apache/camel/component/azure/servicebus/azure-servicebus.json
@@ -55,29 +55,29 @@
   "headers": {
     "CamelAzureServiceBusApplicationProperties": { "index": 0, "kind": "header", "displayName": "", "group": "common", "label": "common", "required": false, "javaType": "Map<String, Object>", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The application properties (also known as custom properties) on messages sent and received by the producer and consumer, respectively.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#APPLICATION_PROPERTIES" },
     "CamelAzureServiceBusContentType": { "index": 1, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the content type of the message.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#CONTENT_TYPE" },
-    "CamelAzureServiceBusCorrelationId": { "index": 2, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets a correlation identifier.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#CORRELATION_ID" },
-    "CamelAzureServiceBusDeadLetterErrorDescription": { "index": 3, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the description for a message that has been dead-lettered.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#DEAD_LETTER_ERROR_DESCRIPTION" },
-    "CamelAzureServiceBusDeadLetterReason": { "index": 4, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the reason a message was dead-lettered.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#DEAD_LETTER_REASON" },
-    "CamelAzureServiceBusDeadLetterSource": { "index": 5, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the name of the queue or subscription that this message was enqueued on, before it was dead-lettered.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#DEAD_LETTER_SOURCE" },
-    "CamelAzureServiceBusDeliveryCount": { "index": 6, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the number of the times this message was delivered to clients.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#DELIVERY_COUNT" },
-    "CamelAzureServiceBusEnqueuedSequenceNumber": { "index": 7, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the enqueued sequence number assigned to a message by Service Bus.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#ENQUEUED_SEQUENCE_NUMBER" },
-    "CamelAzureServiceBusEnqueuedTime": { "index": 8, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "OffsetDateTime", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the datetime at which this message was enqueued in Azure Service Bus.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#ENQUEUED_TIME" },
-    "CamelAzureServiceBusExpiresAt": { "index": 9, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "OffsetDateTime", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the datetime at which this message will expire.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#EXPIRES_AT" },
-    "CamelAzureServiceBusLockToken": { "index": 10, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the lock token for the current message.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#LOCK_TOKEN" },
-    "CamelAzureServiceBusLockedUntil": { "index": 11, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "OffsetDateTime", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the datetime at which the lock of this message expires.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#LOCKED_UNTIL" },
-    "CamelAzureServiceBusMessageId": { "index": 12, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the identifier for the message.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#MESSAGE_ID" },
-    "CamelAzureServiceBusPartitionKey": { "index": 13, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the partition key for sending a message to a partitioned entity.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#PARTITION_KEY" },
-    "CamelAzureServiceBusRawAmqpMessage": { "index": 14, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "AmqpAnnotatedMessage", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The representation of message as defined by AMQP protocol.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#RAW_AMQP_MESSAGE" },
-    "CamelAzureServiceBusReplyTo": { "index": 15, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the address of an entity to send replies to.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#REPLY_TO" },
-    "CamelAzureServiceBusReplyToSessionId": { "index": 16, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets or sets a session identifier augmenting the ReplyTo address.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#REPLY_TO_SESSION_ID" },
-    "CamelAzureServiceBusSequenceNumber": { "index": 17, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the unique number assigned to a message by Service Bus.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#SEQUENCE_NUMBER" },
-    "CamelAzureServiceBusSessionId": { "index": 18, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the session id of the message.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#SESSION_ID" },
-    "CamelAzureServiceBusSubject": { "index": 19, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the subject for the message.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#SUBJECT" },
-    "CamelAzureServiceBusTimeToLive": { "index": 20, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Duration", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the duration before this message expires.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#TIME_TO_LIVE" },
-    "CamelAzureServiceBusTo": { "index": 21, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the to address.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#TO" },
-    "CamelAzureServiceBusScheduledEnqueueTime": { "index": 22, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "OffsetDateTime", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "(producer)Overrides the OffsetDateTime at which the message should appear in the Service Bus queue or topic. (consumer) Gets the scheduled enqueue time of this message.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#SCHEDULED_ENQUEUE_TIME" },
-    "CamelAzureServiceBusServiceBusTransactionContext": { "index": 23, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "ServiceBusTransactionContext", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Overrides the transaction in service. This object just contains transaction id.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#SERVICE_BUS_TRANSACTION_CONTEXT" },
-    "CamelAzureServiceBusProducerOperation": { "index": 24, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "org.apache.camel.component.azure.servicebus.ServiceBusProducerOperationDefinition", "enum": [ "sendMessages", "scheduleMessages" ], "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Overrides the desired operation to be used in the producer.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#PRODUCER_OPERATION" }
+    "CamelAzureServiceBusDeadLetterErrorDescription": { "index": 2, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the description for a message that has been dead-lettered.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#DEAD_LETTER_ERROR_DESCRIPTION" },
+    "CamelAzureServiceBusDeadLetterReason": { "index": 3, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the reason a message was dead-lettered.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#DEAD_LETTER_REASON" },
+    "CamelAzureServiceBusDeadLetterSource": { "index": 4, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the name of the queue or subscription that this message was enqueued on, before it was dead-lettered.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#DEAD_LETTER_SOURCE" },
+    "CamelAzureServiceBusDeliveryCount": { "index": 5, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the number of the times this message was delivered to clients.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#DELIVERY_COUNT" },
+    "CamelAzureServiceBusEnqueuedSequenceNumber": { "index": 6, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the enqueued sequence number assigned to a message by Service Bus.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#ENQUEUED_SEQUENCE_NUMBER" },
+    "CamelAzureServiceBusEnqueuedTime": { "index": 7, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "OffsetDateTime", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the datetime at which this message was enqueued in Azure Service Bus.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#ENQUEUED_TIME" },
+    "CamelAzureServiceBusExpiresAt": { "index": 8, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "OffsetDateTime", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the datetime at which this message will expire.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#EXPIRES_AT" },
+    "CamelAzureServiceBusLockToken": { "index": 9, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the lock token for the current message.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#LOCK_TOKEN" },
+    "CamelAzureServiceBusLockedUntil": { "index": 10, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "OffsetDateTime", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the datetime at which the lock of this message expires.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#LOCKED_UNTIL" },
+    "CamelAzureServiceBusMessageId": { "index": 11, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the identifier for the message.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#MESSAGE_ID" },
+    "CamelAzureServiceBusPartitionKey": { "index": 12, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the partition key for sending a message to a partitioned entity.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#PARTITION_KEY" },
+    "CamelAzureServiceBusRawAmqpMessage": { "index": 13, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "AmqpAnnotatedMessage", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The representation of message as defined by AMQP protocol.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#RAW_AMQP_MESSAGE" },
+    "CamelAzureServiceBusReplyTo": { "index": 14, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the address of an entity to send replies to.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#REPLY_TO" },
+    "CamelAzureServiceBusReplyToSessionId": { "index": 15, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets or sets a session identifier augmenting the ReplyTo address.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#REPLY_TO_SESSION_ID" },
+    "CamelAzureServiceBusSequenceNumber": { "index": 16, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the unique number assigned to a message by Service Bus.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#SEQUENCE_NUMBER" },
+    "CamelAzureServiceBusSessionId": { "index": 17, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the session id of the message.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#SESSION_ID" },
+    "CamelAzureServiceBusSubject": { "index": 18, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the subject for the message.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#SUBJECT" },
+    "CamelAzureServiceBusTimeToLive": { "index": 19, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Duration", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the duration before this message expires.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#TIME_TO_LIVE" },
+    "CamelAzureServiceBusTo": { "index": 20, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets the to address.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#TO" },
+    "CamelAzureServiceBusScheduledEnqueueTime": { "index": 21, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "OffsetDateTime", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "(producer)Overrides the OffsetDateTime at which the message should appear in the Service Bus queue or topic. (consumer) Gets the scheduled enqueue time of this message.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#SCHEDULED_ENQUEUE_TIME" },
+    "CamelAzureServiceBusServiceBusTransactionContext": { "index": 22, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "ServiceBusTransactionContext", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Overrides the transaction in service. This object just contains transaction id.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#SERVICE_BUS_TRANSACTION_CONTEXT" },
+    "CamelAzureServiceBusProducerOperation": { "index": 23, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "org.apache.camel.component.azure.servicebus.ServiceBusProducerOperationDefinition", "enum": [ "sendMessages", "scheduleMessages" ], "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Overrides the desired operation to be used in the producer.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#PRODUCER_OPERATION" },
+    "CamelAzureServiceBusCorrelationId": { "index": 24, "kind": "header", "displayName": "", "group": "common", "label": "common", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Gets or Sets a correlation identifier.", "constantName": "org.apache.camel.component.azure.servicebus.ServiceBusConstants#CORRELATION_ID" }
   },
   "properties": {
     "topicOrQueueName": { "index": 0, "kind": "path", "displayName": "Topic Or Queue Name", "group": "common", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.azure.servicebus.ServiceBusConfiguration", "configurationField": "configuration", "description": "Selected topic name or the queue name, that is depending on serviceBusType config. For example if serviceBusType=queue, then this will be the queue name and if serviceBusType=topic, this will be the topic name." },
diff --git a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConstants.java b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConstants.java
index cde4ff5..5ce1b5a 100644
--- a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConstants.java
+++ b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConstants.java
@@ -29,8 +29,6 @@
     public static final String APPLICATION_PROPERTIES = HEADER_PREFIX + "ApplicationProperties";
     @Metadata(label = "consumer", description = "Gets the content type of the message.", javaType = "String")
     public static final String CONTENT_TYPE = HEADER_PREFIX + "ContentType";
-    @Metadata(label = "consumer", description = "Gets a correlation identifier.", javaType = "String")
-    public static final String CORRELATION_ID = HEADER_PREFIX + "CorrelationId";
     @Metadata(label = "consumer", description = "Gets the description for a message that has been dead-lettered.",
               javaType = "String")
     public static final String DEAD_LETTER_ERROR_DESCRIPTION = HEADER_PREFIX + "DeadLetterErrorDescription";
@@ -98,6 +96,10 @@
               javaType = "org.apache.camel.component.azure.servicebus.ServiceBusProducerOperationDefinition")
     public static final String PRODUCER_OPERATION = HEADER_PREFIX + "ProducerOperation";
 
+    // headers evaluated by the producer and consumer
+    @Metadata(label = "common", description = "Gets or Sets a correlation identifier.", javaType = "String")
+    public static final String CORRELATION_ID = HEADER_PREFIX + "CorrelationId";
+
     private ServiceBusConstants() {
     }
 }
diff --git a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusProducer.java b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusProducer.java
index e6300d3..b390a6a 100644
--- a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusProducer.java
+++ b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusProducer.java
@@ -149,20 +149,23 @@
             final Object inputBody = exchange.getMessage().getBody();
             final Map<String, Object> applicationProperties
                     = exchange.getMessage().getHeader(ServiceBusConstants.APPLICATION_PROPERTIES, Map.class);
+            final String correlationId = exchange.getMessage().getHeader(ServiceBusConstants.CORRELATION_ID, String.class);
 
             Mono<Void> sendMessageAsync;
 
             if (inputBody instanceof Iterable<?>) {
                 sendMessageAsync
                         = serviceBusSenderOperations.sendMessages(convertBodyToList((Iterable<?>) inputBody),
-                                configurationOptionsProxy.getServiceBusTransactionContext(exchange), applicationProperties);
+                                configurationOptionsProxy.getServiceBusTransactionContext(exchange), applicationProperties,
+                                correlationId);
             } else {
                 Object convertedBody = inputBody instanceof BinaryData ? inputBody
                         : getConfiguration().isBinary() ? convertBodyToBinary(exchange)
                         : exchange.getMessage().getBody(String.class);
 
                 sendMessageAsync = serviceBusSenderOperations.sendMessages(convertedBody,
-                        configurationOptionsProxy.getServiceBusTransactionContext(exchange), applicationProperties);
+                        configurationOptionsProxy.getServiceBusTransactionContext(exchange), applicationProperties,
+                        correlationId);
             }
 
             subscribeToMono(sendMessageAsync, exchange, noop -> {
@@ -176,6 +179,7 @@
             final Object inputBody = exchange.getMessage().getBody();
             final Map<String, Object> applicationProperties
                     = exchange.getMessage().getHeader(ServiceBusConstants.APPLICATION_PROPERTIES, Map.class);
+            final String correlationId = exchange.getMessage().getHeader(ServiceBusConstants.CORRELATION_ID, String.class);
 
             Mono<List<Long>> scheduleMessagesAsync;
 
@@ -184,7 +188,8 @@
                         = serviceBusSenderOperations.scheduleMessages(convertBodyToList((Iterable<?>) inputBody),
                                 configurationOptionsProxy.getScheduledEnqueueTime(exchange),
                                 configurationOptionsProxy.getServiceBusTransactionContext(exchange),
-                                applicationProperties);
+                                applicationProperties,
+                                correlationId);
             } else {
                 Object convertedBody = inputBody instanceof BinaryData ? inputBody
                         : getConfiguration().isBinary() ? convertBodyToBinary(exchange)
@@ -193,7 +198,8 @@
                         = serviceBusSenderOperations.scheduleMessages(convertedBody,
                                 configurationOptionsProxy.getScheduledEnqueueTime(exchange),
                                 configurationOptionsProxy.getServiceBusTransactionContext(exchange),
-                                applicationProperties);
+                                applicationProperties,
+                                correlationId);
             }
 
             subscribeToMono(scheduleMessagesAsync, exchange,
diff --git a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusUtils.java b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusUtils.java
index 4f26dbd..120b8c5 100644
--- a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusUtils.java
+++ b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusUtils.java
@@ -22,6 +22,7 @@
 
 import com.azure.core.util.BinaryData;
 import com.azure.messaging.servicebus.ServiceBusMessage;
+import org.apache.camel.util.ObjectHelper;
 
 public final class ServiceBusUtils {
 
@@ -29,7 +30,7 @@
     }
 
     public static ServiceBusMessage createServiceBusMessage(
-            final Object data, final Map<String, Object> applicationProperties) {
+            final Object data, final Map<String, Object> applicationProperties, final String correlationId) {
         ServiceBusMessage serviceBusMessage;
         if (data instanceof String) {
             serviceBusMessage = new ServiceBusMessage((String) data);
@@ -43,13 +44,16 @@
         if (applicationProperties != null) {
             serviceBusMessage.getRawAmqpMessage().getApplicationProperties().putAll(applicationProperties);
         }
+        if (ObjectHelper.isNotEmpty(correlationId)) {
+            serviceBusMessage.setCorrelationId(correlationId);
+        }
         return serviceBusMessage;
     }
 
     public static Iterable<ServiceBusMessage> createServiceBusMessages(
-            final Iterable<?> data, final Map<String, Object> applicationProperties) {
+            final Iterable<?> data, final Map<String, Object> applicationProperties, final String correlationId) {
         return StreamSupport.stream(data.spliterator(), false)
-                .map(obj -> createServiceBusMessage(obj, applicationProperties))
+                .map(obj -> createServiceBusMessage(obj, applicationProperties, correlationId))
                 .collect(Collectors.toList());
     }
 }
diff --git a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/operations/ServiceBusSenderOperations.java b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/operations/ServiceBusSenderOperations.java
index 6121682..c299e9c 100644
--- a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/operations/ServiceBusSenderOperations.java
+++ b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/operations/ServiceBusSenderOperations.java
@@ -41,35 +41,39 @@
     public Mono<Void> sendMessages(
             final Object data,
             final ServiceBusTransactionContext context,
-            final Map<String, Object> applicationProperties) {
+            final Map<String, Object> applicationProperties,
+            final String correlationId) {
         if (data instanceof Iterable<?>) {
-            return sendMessages((Iterable<?>) data, context, applicationProperties);
+            return sendMessages((Iterable<?>) data, context, applicationProperties, correlationId);
         }
 
-        return sendMessage(data, context, applicationProperties);
+        return sendMessage(data, context, applicationProperties, correlationId);
     }
 
     public Mono<List<Long>> scheduleMessages(
             final Object data,
             final OffsetDateTime scheduledEnqueueTime,
             final ServiceBusTransactionContext context,
-            final Map<String, Object> applicationProperties) {
+            final Map<String, Object> applicationProperties,
+            final String correlationId) {
         if (ObjectHelper.isEmpty(scheduledEnqueueTime)) {
             throw new IllegalArgumentException("To schedule a message, you need to set scheduledEnqueueTime.");
         }
 
         if (data instanceof Iterable<?>) {
-            return scheduleMessages((Iterable<?>) data, scheduledEnqueueTime, context, applicationProperties);
+            return scheduleMessages((Iterable<?>) data, scheduledEnqueueTime, context, applicationProperties, correlationId);
         }
 
-        return scheduleMessage(data, scheduledEnqueueTime, context, applicationProperties);
+        return scheduleMessage(data, scheduledEnqueueTime, context, applicationProperties, correlationId);
     }
 
     private Mono<Void> sendMessages(
             final Iterable<?> data,
             final ServiceBusTransactionContext context,
-            final Map<String, Object> applicationProperties) {
-        final Iterable<ServiceBusMessage> messages = ServiceBusUtils.createServiceBusMessages(data, applicationProperties);
+            final Map<String, Object> applicationProperties,
+            final String correlationId) {
+        final Iterable<ServiceBusMessage> messages
+                = ServiceBusUtils.createServiceBusMessages(data, applicationProperties, correlationId);
 
         if (ObjectHelper.isEmpty(context)) {
             return client.sendMessages(messages);
@@ -81,8 +85,9 @@
     private Mono<Void> sendMessage(
             final Object data,
             final ServiceBusTransactionContext context,
-            final Map<String, Object> applicationProperties) {
-        final ServiceBusMessage message = ServiceBusUtils.createServiceBusMessage(data, applicationProperties);
+            final Map<String, Object> applicationProperties,
+            final String correlationId) {
+        final ServiceBusMessage message = ServiceBusUtils.createServiceBusMessage(data, applicationProperties, correlationId);
 
         if (ObjectHelper.isEmpty(context)) {
             return client.sendMessage(message);
@@ -95,8 +100,9 @@
             final Object data,
             final OffsetDateTime scheduledEnqueueTime,
             final ServiceBusTransactionContext context,
-            final Map<String, Object> applicationProperties) {
-        final ServiceBusMessage message = ServiceBusUtils.createServiceBusMessage(data, applicationProperties);
+            final Map<String, Object> applicationProperties,
+            final String correlationId) {
+        final ServiceBusMessage message = ServiceBusUtils.createServiceBusMessage(data, applicationProperties, correlationId);
 
         if (ObjectHelper.isEmpty(context)) {
             return client.scheduleMessage(message, scheduledEnqueueTime)
@@ -110,8 +116,10 @@
     private Mono<List<Long>> scheduleMessages(
             final Iterable<?> data, final OffsetDateTime scheduledEnqueueTime,
             final ServiceBusTransactionContext context,
-            final Map<String, Object> applicationProperties) {
-        final Iterable<ServiceBusMessage> messages = ServiceBusUtils.createServiceBusMessages(data, applicationProperties);
+            final Map<String, Object> applicationProperties,
+            final String correlationId) {
+        final Iterable<ServiceBusMessage> messages
+                = ServiceBusUtils.createServiceBusMessages(data, applicationProperties, correlationId);
 
         if (ObjectHelper.isEmpty(context)) {
             return client.scheduleMessages(messages, scheduledEnqueueTime)
diff --git a/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusUtilsTest.java b/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusUtilsTest.java
index f952bc8..33951cf 100644
--- a/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusUtilsTest.java
+++ b/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusUtilsTest.java
@@ -32,18 +32,18 @@
     @Test
     void testCreateServiceBusMessage() {
         // test string
-        final ServiceBusMessage message1 = ServiceBusUtils.createServiceBusMessage("test string", null);
+        final ServiceBusMessage message1 = ServiceBusUtils.createServiceBusMessage("test string", null, null);
 
         assertEquals("test string", message1.getBody().toString());
 
         // test int
-        final ServiceBusMessage message2 = ServiceBusUtils.createServiceBusMessage(String.valueOf(12345), null);
+        final ServiceBusMessage message2 = ServiceBusUtils.createServiceBusMessage(String.valueOf(12345), null, null);
 
         assertEquals("12345", message2.getBody().toString());
 
         //test bytes
         byte[] testByteBody = "test string".getBytes(StandardCharsets.UTF_8);
-        final ServiceBusMessage message3 = ServiceBusUtils.createServiceBusMessage(testByteBody, null);
+        final ServiceBusMessage message3 = ServiceBusUtils.createServiceBusMessage(testByteBody, null, null);
         assertArrayEquals(testByteBody, message3.getBody().toBytes());
     }
 
@@ -53,7 +53,7 @@
         inputMessages.add("test data");
         inputMessages.add(String.valueOf(12345));
 
-        final Iterable<ServiceBusMessage> busMessages = ServiceBusUtils.createServiceBusMessages(inputMessages, null);
+        final Iterable<ServiceBusMessage> busMessages = ServiceBusUtils.createServiceBusMessages(inputMessages, null, null);
 
         assertTrue(StreamSupport.stream(busMessages.spliterator(), false)
                 .anyMatch(record -> record.getBody().toString().equals("test data")));
@@ -67,7 +67,7 @@
         inputMessages2.add(byteBody1);
         inputMessages2.add(byteBody2);
 
-        final Iterable<ServiceBusMessage> busMessages2 = ServiceBusUtils.createServiceBusMessages(inputMessages2, null);
+        final Iterable<ServiceBusMessage> busMessages2 = ServiceBusUtils.createServiceBusMessages(inputMessages2, null, null);
 
         assertTrue(StreamSupport.stream(busMessages2.spliterator(), false)
                 .anyMatch(message -> Arrays.equals(message.getBody().toBytes(), byteBody1)));
diff --git a/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/integration/ServiceBusConsumerTest.java b/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/integration/ServiceBusConsumerTest.java
index dbb34aa..9ff5ebb 100644
--- a/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/integration/ServiceBusConsumerTest.java
+++ b/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/integration/ServiceBusConsumerTest.java
@@ -53,7 +53,7 @@
         inputBatch.add("test batch 3");
 
         new ServiceBusSenderOperations(new ServiceBusSenderAsyncClientWrapper(senderAsyncClient))
-                .sendMessages(inputBatch, null, null)
+                .sendMessages(inputBatch, null, null, null)
                 .block();
 
         // test the data now
@@ -83,7 +83,7 @@
         inputBatch.add("peek test batch 3");
 
         new ServiceBusSenderOperations(new ServiceBusSenderAsyncClientWrapper(senderAsyncClient))
-                .sendMessages(inputBatch, null, null)
+                .sendMessages(inputBatch, null, null, null)
                 .block();
 
         // test the data now
diff --git a/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/integration/operations/ServiceBusSenderOperationsTest.java b/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/integration/operations/ServiceBusSenderOperationsTest.java
index f8a5354..b131609 100644
--- a/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/integration/operations/ServiceBusSenderOperationsTest.java
+++ b/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/integration/operations/ServiceBusSenderOperationsTest.java
@@ -88,7 +88,7 @@
     void testSendSingleMessage() {
         final ServiceBusSenderOperations operations = new ServiceBusSenderOperations(clientSenderWrapper);
 
-        operations.sendMessages("test data", null, Map.of("customKey", "customValue")).block();
+        operations.sendMessages("test data", null, Map.of("customKey", "customValue"), null).block();
 
         final boolean exists = StreamSupport.stream(clientReceiverWrapper.receiveMessages().toIterable().spliterator(), false)
                 .anyMatch(serviceBusReceivedMessage -> serviceBusReceivedMessage.getBody().toString().equals("test data"));
@@ -97,7 +97,7 @@
 
         //test bytes
         byte[] testByteBody = "test data".getBytes(StandardCharsets.UTF_8);
-        operations.sendMessages(testByteBody, null, Map.of("customKey", "customValue")).block();
+        operations.sendMessages(testByteBody, null, Map.of("customKey", "customValue"), null).block();
         final boolean exists2 = StreamSupport.stream(clientReceiverWrapper.receiveMessages().toIterable().spliterator(), false)
                 .anyMatch(serviceBusReceivedMessage -> Arrays.equals(serviceBusReceivedMessage.getBody().toBytes(),
                         testByteBody));
@@ -105,7 +105,7 @@
 
         // test if we have something other than string or byte[]
         assertThrows(IllegalArgumentException.class, () -> {
-            operations.sendMessages(12345, null, null).block();
+            operations.sendMessages(12345, null, null, null).block();
         });
     }
 
@@ -118,7 +118,7 @@
         inputBatch.add("test batch 2");
         inputBatch.add("test batch 3");
 
-        operations.sendMessages(inputBatch, null, null).block();
+        operations.sendMessages(inputBatch, null, null, null).block();
 
         final Spliterator<ServiceBusReceivedMessage> receivedMessages
                 = clientReceiverWrapper.receiveMessages().toIterable().spliterator();
@@ -143,7 +143,7 @@
         inputBatch2.add(byteBody1);
         inputBatch2.add(byteBody2);
 
-        operations.sendMessages(inputBatch2, null, null).block();
+        operations.sendMessages(inputBatch2, null, null, null).block();
         final Spliterator<ServiceBusReceivedMessage> receivedMessages2
                 = clientReceiverWrapper.receiveMessages().toIterable().spliterator();
 
@@ -161,7 +161,7 @@
     void testScheduleMessage() {
         final ServiceBusSenderOperations operations = new ServiceBusSenderOperations(clientSenderWrapper);
 
-        operations.scheduleMessages("testScheduleMessage", OffsetDateTime.now(), null, null).block();
+        operations.scheduleMessages("testScheduleMessage", OffsetDateTime.now(), null, null, null).block();
 
         final boolean exists = StreamSupport.stream(clientReceiverWrapper.receiveMessages().toIterable().spliterator(), false)
                 .anyMatch(serviceBusReceivedMessage -> serviceBusReceivedMessage.getBody().toString()
@@ -171,7 +171,7 @@
 
         //test bytes
         byte[] testByteBody = "test data".getBytes(StandardCharsets.UTF_8);
-        operations.scheduleMessages(testByteBody, OffsetDateTime.now(), null, null).block();
+        operations.scheduleMessages(testByteBody, OffsetDateTime.now(), null, null, null).block();
         final boolean exists2 = StreamSupport.stream(clientReceiverWrapper.receiveMessages().toIterable().spliterator(), false)
                 .anyMatch(serviceBusReceivedMessage -> Arrays.equals(serviceBusReceivedMessage.getBody().toBytes(),
                         testByteBody));
@@ -179,7 +179,7 @@
 
         // test if we have something other than string or byte[]
         assertThrows(IllegalArgumentException.class, () -> {
-            operations.scheduleMessages(12345, OffsetDateTime.now(), null, null).block();
+            operations.scheduleMessages(12345, OffsetDateTime.now(), null, null, null).block();
         });
     }
 
@@ -192,7 +192,7 @@
         inputBatch.add("testSchedulingBatchMessages 2");
         inputBatch.add("testSchedulingBatchMessages 3");
 
-        operations.scheduleMessages(inputBatch, OffsetDateTime.now(), null, null).block();
+        operations.scheduleMessages(inputBatch, OffsetDateTime.now(), null, null, null).block();
 
         final Spliterator<ServiceBusReceivedMessage> receivedMessages
                 = clientReceiverWrapper.receiveMessages().toIterable().spliterator();
@@ -220,7 +220,7 @@
         inputBatch2.add(byteBody1);
         inputBatch2.add(byteBody2);
 
-        operations.scheduleMessages(inputBatch2, OffsetDateTime.now(), null, null).block();
+        operations.scheduleMessages(inputBatch2, OffsetDateTime.now(), null, null, null).block();
         final Spliterator<ServiceBusReceivedMessage> receivedMessages2
                 = clientReceiverWrapper.receiveMessages().toIterable().spliterator();