[rocketmq-connect-kafka]: Completion method KafkaSourceTask#pause(), KafkaSourceTask#resume(). (#854)

[rocketmq-connect-kafka]: Normalized code style
diff --git a/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnector.java b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnector.java
index 567a8e9..680df6e 100644
--- a/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnector.java
+++ b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnector.java
@@ -42,7 +42,7 @@
 
         log.info("KafkaSourceConnector verifyAndSetConfig enter");
         for (String key : config.keySet()) {
-            log.info("connector verifyAndSetConfig: key:{}, value:{}", key, config.getString(key));
+            log.info("connector verifyAndSetConfig: key: {}, value: {}", key, config.getString(key));
         }
 
         for (String requestKey : ConfigDefine.REQUEST_CONFIG) {
diff --git a/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java
index 6122b0e..f077ac0 100644
--- a/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java
@@ -132,11 +132,13 @@
     @Override
     public void pause() {
         log.info("source task pause ...");
+        consumer.pause(currentTPList);
     }
 
     @Override
     public void resume() {
         log.info("source task resume ...");
+        consumer.resume(currentTPList);
     }
 
     public String toString() {
@@ -179,7 +181,7 @@
         log.info("commitOffset {} topic partition {}", KafkaSourceTask.this, tpList);
         List<ByteBuffer> topic_partition_list = new ArrayList<>();
         for (TopicPartition tp : tpList) {
-            topic_partition_list.add(ByteBuffer.wrap((tp.topic()+"-"+tp.partition()).getBytes()));
+            topic_partition_list.add(ByteBuffer.wrap((tp.topic() + "-" + tp.partition()).getBytes()));
         }
 
         Map<TopicPartition, OffsetAndMetadata> commitOffsets = new HashMap<>();
@@ -198,7 +200,7 @@
         }
 
         commitOffsets.entrySet().stream().forEach((Map.Entry<TopicPartition, OffsetAndMetadata> entry) ->
-                log.info("commitOffset {}, TopicPartition:{} offset:{}", KafkaSourceTask.this, entry.getKey(), entry.getValue()));
+                log.info("commitOffset {}, TopicPartition: {} offset: {}", KafkaSourceTask.this, entry.getKey(), entry.getValue()));
         if (!commitOffsets.isEmpty()) {
             if (isClose) {
                 consumer.commitSync(commitOffsets);
@@ -213,9 +215,9 @@
         @Override
         public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
             if (e != null) {
-                log.warn("commit async excepiton {}", e);
+                log.warn("commit async excepiton", e);
                 map.entrySet().stream().forEach((Map.Entry<TopicPartition, OffsetAndMetadata> entry) -> {
-                    log.warn("commit exception, TopicPartition:{} offset: {}", entry.getKey().toString(), entry.getValue().offset());
+                    log.warn("commit exception, TopicPartition: {} offset: {}", entry.getKey().toString(), entry.getValue().offset());
                 });
                 return;
             }
@@ -229,9 +231,9 @@
 
             currentTPList.clear();
             for (TopicPartition tp : partitions) {
+                log.info("onPartitionsAssigned TopicPartition {}", tp);
                 currentTPList.add(tp);
             }
-            currentTPList.stream().forEach((TopicPartition tp)-> log.info("onPartitionsAssigned  TopicPartition {}", tp));
         }
 
         @Override