[Replicator] Fix message duplication problem (#692)

Signed-off-by: zhangyang <Git_Yang@163.com>
diff --git a/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java b/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
index 3e8d78b..da7013a 100644
--- a/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
+++ b/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
@@ -160,10 +160,10 @@
                             schema.getFields().add(new Field(0,
                                 FieldName.COMMON_MESSAGE.getKey(), FieldType.STRING));
 
-                            DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(schema);
-                            dataEntryBuilder.timestamp(System.currentTimeMillis())
-                                .queue(this.config.getStoreTopic()).entryType(EntryType.CREATE);
                             for (MessageExt msg : msgs) {
+                                DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(schema);
+                                dataEntryBuilder.timestamp(System.currentTimeMillis())
+                                        .queue(this.config.getStoreTopic()).entryType(EntryType.CREATE);
                                 dataEntryBuilder.putFiled(FieldName.COMMON_MESSAGE.getKey(), new String(msg.getBody()));
                                 SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry(
                                     ByteBuffer.wrap(RmqConstants.getPartition(