[rocketmq-connect-kafka]: Completion method KafkaSourceTask#pause(), KafkaSourceTask#resume(). (#854)
[rocketmq-connect-kafka]: Normalized code style
diff --git a/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnector.java b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnector.java
index 567a8e9..680df6e 100644
--- a/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnector.java
+++ b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnector.java
@@ -42,7 +42,7 @@
log.info("KafkaSourceConnector verifyAndSetConfig enter");
for (String key : config.keySet()) {
- log.info("connector verifyAndSetConfig: key:{}, value:{}", key, config.getString(key));
+ log.info("connector verifyAndSetConfig: key: {}, value: {}", key, config.getString(key));
}
for (String requestKey : ConfigDefine.REQUEST_CONFIG) {
diff --git a/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java
index 6122b0e..f077ac0 100644
--- a/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java
@@ -132,11 +132,13 @@
@Override
public void pause() {
log.info("source task pause ...");
+ consumer.pause(currentTPList);
}
@Override
public void resume() {
log.info("source task resume ...");
+ consumer.resume(currentTPList);
}
public String toString() {
@@ -179,7 +181,7 @@
log.info("commitOffset {} topic partition {}", KafkaSourceTask.this, tpList);
List<ByteBuffer> topic_partition_list = new ArrayList<>();
for (TopicPartition tp : tpList) {
- topic_partition_list.add(ByteBuffer.wrap((tp.topic()+"-"+tp.partition()).getBytes()));
+ topic_partition_list.add(ByteBuffer.wrap((tp.topic() + "-" + tp.partition()).getBytes()));
}
Map<TopicPartition, OffsetAndMetadata> commitOffsets = new HashMap<>();
@@ -198,7 +200,7 @@
}
commitOffsets.entrySet().stream().forEach((Map.Entry<TopicPartition, OffsetAndMetadata> entry) ->
- log.info("commitOffset {}, TopicPartition:{} offset:{}", KafkaSourceTask.this, entry.getKey(), entry.getValue()));
+ log.info("commitOffset {}, TopicPartition: {} offset: {}", KafkaSourceTask.this, entry.getKey(), entry.getValue()));
if (!commitOffsets.isEmpty()) {
if (isClose) {
consumer.commitSync(commitOffsets);
@@ -213,9 +215,9 @@
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
if (e != null) {
- log.warn("commit async excepiton {}", e);
+ log.warn("commit async excepiton", e);
map.entrySet().stream().forEach((Map.Entry<TopicPartition, OffsetAndMetadata> entry) -> {
- log.warn("commit exception, TopicPartition:{} offset: {}", entry.getKey().toString(), entry.getValue().offset());
+ log.warn("commit exception, TopicPartition: {} offset: {}", entry.getKey().toString(), entry.getValue().offset());
});
return;
}
@@ -229,9 +231,9 @@
currentTPList.clear();
for (TopicPartition tp : partitions) {
+ log.info("onPartitionsAssigned TopicPartition {}", tp);
currentTPList.add(tp);
}
- currentTPList.stream().forEach((TopicPartition tp)-> log.info("onPartitionsAssigned TopicPartition {}", tp));
}
@Override