[ISSUE #233]WorkerSourceTask send messages to support partition ordering (#237)
* Fix debezium demecial type conversion problem #190
* Upgrade rocketmq-replicator API to v0.1.3 #189
* Encountered change event for table databasename.tablename whose schema isn`t known to this connector #191
* Debezium mysql source connector delete event causes null pointer #196
* remove local config
* Debezium mysql source connector delete event causes null pointer #196
* Rocketmq replicator running null pointer #205
* WorkerSourceTask send messages to support partition ordering #233
* WorkerSourceTask send messages to support partition ordering #233
* test send message by key
* remove unused code
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
index 40c7eac..5d4e98e 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
@@ -34,6 +34,7 @@
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
@@ -215,7 +216,8 @@
/**prepare to send record*/
Optional<RecordOffsetManagement.SubmittedPosition> submittedRecordPosition = prepareToSendRecord(preTransformRecord);
try {
- producer.send(sourceMessage, new SendCallback() {
+
+ SendCallback callback = new SendCallback() {
@Override
public void onSuccess(SendResult result) {
log.info("Successful send message to RocketMQ:{}, Topic {}", result.getMsgId(), result.getMessageQueue().getTopic());
@@ -235,7 +237,17 @@
// record send failed
recordSendFailed(false, sourceMessage, preTransformRecord, throwable);
}
- });
+ };
+
+ if (StringUtils.isEmpty(sourceMessage.getKeys())) {
+ // Round robin
+ producer.send(sourceMessage, callback);
+ } else {
+ // Partition message ordering,
+ // At the same time, ensure that the data is pulled in an orderly manner, which needs to be guaranteed by sourceTask in the business
+ producer.send(sourceMessage, new SelectMessageQueueByHash(), sourceMessage.getKeys(), callback);
+ }
+
} catch (RetriableException e) {
log.warn("{} Failed to send record to topic '{}'. Backing off before retrying: ",
this, sourceMessage.getTopic(), e);
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/Utils.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/Utils.java
index 60f1b5b..79f52c5 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/Utils.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/Utils.java
@@ -51,4 +51,5 @@
}
}
}
+
}