[Replicator] Fix message duplication problem (#692)
Signed-off-by: zhangyang <Git_Yang@163.com>
diff --git a/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java b/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
index 3e8d78b..da7013a 100644
--- a/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
+++ b/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
@@ -160,10 +160,10 @@
schema.getFields().add(new Field(0,
FieldName.COMMON_MESSAGE.getKey(), FieldType.STRING));
- DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(schema);
- dataEntryBuilder.timestamp(System.currentTimeMillis())
- .queue(this.config.getStoreTopic()).entryType(EntryType.CREATE);
for (MessageExt msg : msgs) {
+ DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(schema);
+ dataEntryBuilder.timestamp(System.currentTimeMillis())
+ .queue(this.config.getStoreTopic()).entryType(EntryType.CREATE);
dataEntryBuilder.putFiled(FieldName.COMMON_MESSAGE.getKey(), new String(msg.getBody()));
SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry(
ByteBuffer.wrap(RmqConstants.getPartition(