[ISSUE #233]WorkerSourceTask send messages to support partition ordering  (#237)

* Fix debezium demecial type conversion problem #190

* Upgrade rocketmq-replicator API to v0.1.3 #189

* Encountered change event for table databasename.tablename whose schema isn`t known to this connector #191

* Debezium mysql source connector delete event causes null pointer #196

* remove local config

* Debezium mysql source connector delete event causes null pointer #196

* Rocketmq replicator running null pointer #205

* WorkerSourceTask send messages to support partition ordering #233

* WorkerSourceTask send messages to support partition ordering #233

* test send message by key

* remove unused code
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
index 40c7eac..5d4e98e 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
@@ -34,6 +34,7 @@
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.SendCallback;
 import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageAccessor;
@@ -215,7 +216,8 @@
             /**prepare to send record*/
             Optional<RecordOffsetManagement.SubmittedPosition> submittedRecordPosition = prepareToSendRecord(preTransformRecord);
             try {
-                producer.send(sourceMessage, new SendCallback() {
+
+                SendCallback callback =  new SendCallback() {
                     @Override
                     public void onSuccess(SendResult result) {
                         log.info("Successful send message to RocketMQ:{}, Topic {}", result.getMsgId(), result.getMessageQueue().getTopic());
@@ -235,7 +237,17 @@
                         // record send failed
                         recordSendFailed(false, sourceMessage, preTransformRecord, throwable);
                     }
-                });
+                };
+
+                if (StringUtils.isEmpty(sourceMessage.getKeys())) {
+                    // Round robin
+                    producer.send(sourceMessage, callback);
+                } else {
+                    // Partition message ordering,
+                    // At the same time, ensure that the data is pulled in an orderly manner, which needs to be guaranteed by sourceTask in the business
+                    producer.send(sourceMessage, new SelectMessageQueueByHash(), sourceMessage.getKeys(), callback);
+                }
+
             } catch (RetriableException e) {
                 log.warn("{} Failed to send record to topic '{}'. Backing off before retrying: ",
                     this, sourceMessage.getTopic(), e);
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/Utils.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/Utils.java
index 60f1b5b..79f52c5 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/Utils.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/Utils.java
@@ -51,4 +51,5 @@
             }
         }
     }
+
 }