Fix the problem of abnormal statistics under Bcq (#4567)

* Fix autoBatch not compatible with batchConsumeQueue

* Fix the problem of abnormal statistics under Bcq
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index be013e9..7baa9c6 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -907,7 +907,7 @@
         PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
 
         // Statistics
-        storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(1);
+        storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(result.getMsgNum());
         storeStatsService.getSinglePutMessageTopicSizeTotal(topic).add(result.getWroteBytes());
 
         return handleDiskFlushAndHA(putMessageResult, msg, needAckNums, needHandleHA);
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 0da12a2..05b486d 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -804,7 +804,7 @@
                                 continue;
                             }
 
-                            this.storeStatsService.getGetMessageTransferedMsgCount().add(1);
+                            this.storeStatsService.getGetMessageTransferedMsgCount().add(cqUnit.getBatchNum());
                             getResult.addMessage(selectResult, cqUnit.getQueueOffset(), cqUnit.getBatchNum());
                             status = GetMessageStatus.FOUND;
                             nextPhyFileStartOffset = Long.MIN_VALUE;
@@ -2457,7 +2457,7 @@
                                     if (!DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() &&
                                         DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
                                         DefaultMessageStore.this.storeStatsService
-                                            .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(1);
+                                            .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(dispatchRequest.getBatchSize());
                                         DefaultMessageStore.this.storeStatsService
                                             .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
                                             .add(dispatchRequest.getMsgSize());