[improvement](transaction) reduce publish txn log (#28277)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index f0d1930..acb82ae 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -456,6 +456,10 @@
             + " dead lock" })
     public static boolean publish_version_check_alter_replica = true;
 
+    @ConfField(mutable = true, masterOnly = true, description = {"单个事务 publish 失败打日志间隔",
+            "print log interval for publish transaction failed interval"})
+    public static long publish_fail_log_interval_second = 5 * 60;
+
     @ConfField(mutable = true, masterOnly = true, description = {"提交事务的最大超时时间,单位是秒。"
             + "该参数仅用于事务型 insert 操作中。",
             "Maximal waiting time for all data inserted before one transaction to be committed, in seconds. "
diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index 6ef7d3e..2947125 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -948,19 +948,8 @@
 
         // add all commit errors and publish errors to a single set
         Set<Long> errorReplicaIds = transactionState.getErrorReplicas();
-        Map<Long, PublishVersionTask> publishTasks = transactionState.getPublishVersionTasks();
 
-        long now = System.currentTimeMillis();
-        long firstPublishVersionTime = transactionState.getFirstPublishVersionTime();
-        boolean allowPublishOneSucc = false;
-        if (Config.publish_wait_time_second > 0 && firstPublishVersionTime > 0
-                && now >= firstPublishVersionTime + Config.publish_wait_time_second * 1000L) {
-            allowPublishOneSucc = true;
-        }
-
-        List<Replica> tabletSuccReplicas = Lists.newArrayList();
-        List<Replica> tabletWriteFailedReplicas = Lists.newArrayList();
-        List<Replica> tabletVersionFailedReplicas = Lists.newArrayList();
+        List<Pair<OlapTable, Partition>> relatedTblPartitions = Lists.newArrayList();
 
         // case 1 If database is dropped, then we just throw MetaNotFoundException, because all related tables are
         // already force dropped, we just ignore the transaction with all tables been force dropped.
@@ -975,134 +964,12 @@
         LOG.debug("finish transaction {} with tables {}", transactionId, tableIdList);
         List<? extends TableIf> tableList = db.getTablesOnIdOrderIfExist(tableIdList);
         tableList = MetaLockUtils.writeLockTablesIfExist(tableList);
-        PublishResult publishResult = PublishResult.QUORUM_SUCC;
+        PublishResult publishResult;
         try {
-            Iterator<TableCommitInfo> tableCommitInfoIterator
-                    = transactionState.getIdToTableCommitInfos().values().iterator();
-            while (tableCommitInfoIterator.hasNext()) {
-                TableCommitInfo tableCommitInfo = tableCommitInfoIterator.next();
-                long tableId = tableCommitInfo.getTableId();
-                OlapTable table = (OlapTable) db.getTableNullable(tableId);
-                // table maybe dropped between commit and publish, ignore this error
-                if (table == null) {
-                    tableCommitInfoIterator.remove();
-                    LOG.warn("table {} is dropped, skip version check and remove it from transaction state {}",
-                            tableId,
-                            transactionState);
-                    continue;
-                }
-
-                boolean alterReplicaLoadedTxn = isAlterReplicaLoadedTxn(transactionId, table);
-                Iterator<PartitionCommitInfo> partitionCommitInfoIterator
-                        = tableCommitInfo.getIdToPartitionCommitInfo().values().iterator();
-                while (partitionCommitInfoIterator.hasNext()) {
-                    PartitionCommitInfo partitionCommitInfo = partitionCommitInfoIterator.next();
-                    long partitionId = partitionCommitInfo.getPartitionId();
-                    Partition partition = table.getPartition(partitionId);
-                    // partition maybe dropped between commit and publish version, ignore this error
-                    if (partition == null) {
-                        partitionCommitInfoIterator.remove();
-                        LOG.warn("partition {} is dropped, skip version check"
-                                        + " and remove it from transaction state {}", partitionId, transactionState);
-                        continue;
-                    }
-                    if (partition.getVisibleVersion() != partitionCommitInfo.getVersion() - 1) {
-                        LOG.debug("transactionId {} partition commitInfo version {} is not equal with "
-                                        + "partition visible version {} plus one, need wait",
-                                transactionId,
-                                partitionCommitInfo.getVersion(),
-                                partition.getVisibleVersion());
-                        String errMsg = String.format("wait for publishing partition %d version %d."
-                                        + " self version: %d. table %d", partitionId, partition.getVisibleVersion() + 1,
-                                partitionCommitInfo.getVersion(), tableId);
-                        transactionState.setErrorMsg(errMsg);
-                        return;
-                    }
-
-                    int loadRequiredReplicaNum = table.getLoadRequiredReplicaNum(partitionId);
-                    List<MaterializedIndex> allIndices;
-                    if (transactionState.getLoadedTblIndexes().isEmpty()) {
-                        allIndices = partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL);
-                    } else {
-                        allIndices = Lists.newArrayList();
-                        for (long indexId : transactionState.getLoadedTblIndexes().get(tableId)) {
-                            MaterializedIndex index = partition.getIndex(indexId);
-                            if (index != null) {
-                                allIndices.add(index);
-                            }
-                        }
-                    }
-
-                    // check success replica number for each tablet.
-                    // a success replica means:
-                    //  1. Not in errorReplicaIds: succeed in both commit and publish phase
-                    //  2. last failed version < 0: is a health replica before
-                    //  3. version catch up: not with a stale version
-                    // Here we only check number, the replica version will be updated in updateCatalogAfterVisible()
-                    for (MaterializedIndex index : allIndices) {
-                        for (Tablet tablet : index.getTablets()) {
-                            tabletSuccReplicas.clear();
-                            tabletWriteFailedReplicas.clear();
-                            tabletVersionFailedReplicas.clear();
-                            for (Replica replica : tablet.getReplicas()) {
-                                checkReplicaContinuousVersionSucc(tablet.getId(), replica, alterReplicaLoadedTxn,
-                                        partitionCommitInfo.getVersion(), publishTasks.get(replica.getBackendId()),
-                                        errorReplicaIds, tabletSuccReplicas, tabletWriteFailedReplicas,
-                                        tabletVersionFailedReplicas);
-                            }
-
-                            int healthReplicaNum = tabletSuccReplicas.size();
-                            if (healthReplicaNum >= loadRequiredReplicaNum) {
-                                if (!tabletWriteFailedReplicas.isEmpty() || !tabletVersionFailedReplicas.isEmpty()) {
-                                    String writeDetail = getTabletWriteDetail(tabletSuccReplicas,
-                                            tabletWriteFailedReplicas, tabletVersionFailedReplicas);
-                                    LOG.info("publish version quorum succ for transaction {} on tablet {} with version"
-                                            + " {}, and has failed replicas, load require replica num {}. table {}, "
-                                            + "partition {}, tablet detail: {}",
-                                            transactionState, tablet.getId(), partitionCommitInfo.getVersion(),
-                                            loadRequiredReplicaNum, tableId, partitionId, writeDetail);
-                                }
-                                continue;
-                            }
-
-                            String writeDetail = getTabletWriteDetail(tabletSuccReplicas, tabletWriteFailedReplicas,
-                                    tabletVersionFailedReplicas);
-                            if (allowPublishOneSucc && healthReplicaNum > 0) {
-                                if (publishResult == PublishResult.QUORUM_SUCC) {
-                                    publishResult = PublishResult.TIMEOUT_SUCC;
-                                }
-                                // We can not do any thing except retrying,
-                                // because publish task is assigned a version,
-                                // and doris does not permit discontinuous
-                                // versions.
-                                //
-                                // If a timeout happens, it means that the rowset
-                                // that are being publised exists on a few replicas we should go
-                                // ahead, otherwise data may be lost and thre
-                                // publish task hangs forever.
-                                LOG.info("publish version timeout succ for transaction {} on tablet {} with version"
-                                        + " {}, and has failed replicas, load require replica num {}. table {}, "
-                                        + "partition {}, tablet detail: {}",
-                                        transactionState, tablet.getId(), partitionCommitInfo.getVersion(),
-                                        loadRequiredReplicaNum, tableId, partitionId, writeDetail);
-                            } else {
-                                publishResult = PublishResult.FAILED;
-                                String errMsg = String.format("publish on tablet %d failed."
-                                                + " succeed replica num %d < load required replica num %d."
-                                                + " table: %d, partition: %d, publish version: %d",
-                                        tablet.getId(), healthReplicaNum, loadRequiredReplicaNum, tableId,
-                                        partitionId, partition.getVisibleVersion() + 1);
-                                transactionState.setErrorMsg(errMsg);
-                                LOG.info("publish version failed for transaction {} on tablet {} with version"
-                                        + " {}, and has failed replicas, load required replica num {}. table {}, "
-                                        + "partition {}, tablet detail: {}",
-                                        transactionState, tablet.getId(), partitionCommitInfo.getVersion(),
-                                        loadRequiredReplicaNum, tableId, partitionId, writeDetail);
-                            }
-                        }
-                    }
-                }
+            if (!finishCheckPartitionVersion(transactionState, db, relatedTblPartitions)) {
+                return;
             }
+            publishResult = finishCheckQuorumReplicas(transactionState, relatedTblPartitions, errorReplicaIds);
             if (publishResult == PublishResult.FAILED) {
                 return;
             }
@@ -1137,7 +1004,181 @@
         // Otherwise, there is no way for stream load to query the result right after loading finished,
         // even if we call "sync" before querying.
         transactionState.countdownVisibleLatch();
-        LOG.info("finish transaction {} successfully, publish result: {}", transactionState, publishResult.name());
+        LOG.info("finish transaction {} successfully, publish times {}, publish result {}",
+                transactionState, transactionState.getPublishCount(), publishResult.name());
+    }
+
+    private boolean finishCheckPartitionVersion(TransactionState transactionState, Database db,
+            List<Pair<OlapTable, Partition>> relatedTblPartitions) {
+        Iterator<TableCommitInfo> tableCommitInfoIterator
+                = transactionState.getIdToTableCommitInfos().values().iterator();
+        while (tableCommitInfoIterator.hasNext()) {
+            TableCommitInfo tableCommitInfo = tableCommitInfoIterator.next();
+            long tableId = tableCommitInfo.getTableId();
+            OlapTable table = (OlapTable) db.getTableNullable(tableId);
+            // table maybe dropped between commit and publish, ignore this error
+            if (table == null) {
+                tableCommitInfoIterator.remove();
+                LOG.warn("table {} is dropped, skip version check and remove it from transaction state {}",
+                        tableId,
+                        transactionState);
+                continue;
+            }
+
+            Iterator<PartitionCommitInfo> partitionCommitInfoIterator
+                    = tableCommitInfo.getIdToPartitionCommitInfo().values().iterator();
+            while (partitionCommitInfoIterator.hasNext()) {
+                PartitionCommitInfo partitionCommitInfo = partitionCommitInfoIterator.next();
+                long partitionId = partitionCommitInfo.getPartitionId();
+                Partition partition = table.getPartition(partitionId);
+                // partition maybe dropped between commit and publish version, ignore this error
+                if (partition == null) {
+                    partitionCommitInfoIterator.remove();
+                    LOG.warn("partition {} is dropped, skip version check"
+                                    + " and remove it from transaction state {}", partitionId, transactionState);
+                    continue;
+                }
+                if (partition.getVisibleVersion() != partitionCommitInfo.getVersion() - 1) {
+                    LOG.debug("for table {} partition {}, transactionId {} partition commitInfo version {} is not"
+                            + " equal with partition visible version {} plus one, need wait",
+                            table.getId(), partition.getId(), transactionState.getTransactionId(),
+                            partitionCommitInfo.getVersion(), partition.getVisibleVersion());
+                    String errMsg = String.format("wait for publishing partition %d version %d."
+                                    + " self version: %d. table %d", partitionId, partition.getVisibleVersion() + 1,
+                            partitionCommitInfo.getVersion(), tableId);
+                    transactionState.setErrorMsg(errMsg);
+                    return false;
+                }
+
+                relatedTblPartitions.add(Pair.of(table, partition));
+            }
+        }
+
+        return true;
+    }
+
+    private PublishResult finishCheckQuorumReplicas(TransactionState transactionState,
+            List<Pair<OlapTable, Partition>> relatedTblPartitions,
+            Set<Long> errorReplicaIds) {
+        long now = System.currentTimeMillis();
+        long firstPublishVersionTime = transactionState.getFirstPublishVersionTime();
+        boolean allowPublishOneSucc = false;
+        if (Config.publish_wait_time_second > 0 && firstPublishVersionTime > 0
+                && now >= firstPublishVersionTime + Config.publish_wait_time_second * 1000L) {
+            allowPublishOneSucc = true;
+        }
+
+        List<Replica> tabletSuccReplicas = Lists.newArrayList();
+        List<Replica> tabletWriteFailedReplicas = Lists.newArrayList();
+        List<Replica> tabletVersionFailedReplicas = Lists.newArrayList();
+        List<String> logs = Lists.newArrayList();
+
+        Map<Long, PublishVersionTask> publishTasks = transactionState.getPublishVersionTasks();
+        PublishResult publishResult = PublishResult.QUORUM_SUCC;
+        for (Pair<OlapTable, Partition> pair : relatedTblPartitions) {
+            OlapTable table = pair.key();
+            Partition partition = pair.value();
+            long tableId = table.getId();
+            long partitionId = partition.getId();
+            long newVersion = partition.getVisibleVersion() + 1;
+            int loadRequiredReplicaNum = table.getLoadRequiredReplicaNum(partitionId);
+            List<MaterializedIndex> allIndices;
+            if (transactionState.getLoadedTblIndexes().isEmpty()) {
+                allIndices = partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL);
+            } else {
+                allIndices = Lists.newArrayList();
+                for (long indexId : transactionState.getLoadedTblIndexes().get(tableId)) {
+                    MaterializedIndex index = partition.getIndex(indexId);
+                    if (index != null) {
+                        allIndices.add(index);
+                    }
+                }
+            }
+
+            boolean alterReplicaLoadedTxn = isAlterReplicaLoadedTxn(transactionState.getTransactionId(), table);
+
+            // check success replica number for each tablet.
+            // a success replica means:
+            //  1. Not in errorReplicaIds: succeed in both commit and publish phase
+            //  2. last failed version < 0: is a health replica before
+            //  3. version catch up: not with a stale version
+            // Here we only check number, the replica version will be updated in updateCatalogAfterVisible()
+            for (MaterializedIndex index : allIndices) {
+                for (Tablet tablet : index.getTablets()) {
+                    tabletSuccReplicas.clear();
+                    tabletWriteFailedReplicas.clear();
+                    tabletVersionFailedReplicas.clear();
+                    for (Replica replica : tablet.getReplicas()) {
+                        checkReplicaContinuousVersionSucc(tablet.getId(), replica, alterReplicaLoadedTxn,
+                                newVersion, publishTasks.get(replica.getBackendId()),
+                                errorReplicaIds, tabletSuccReplicas, tabletWriteFailedReplicas,
+                                tabletVersionFailedReplicas);
+                    }
+
+                    int healthReplicaNum = tabletSuccReplicas.size();
+                    if (healthReplicaNum >= loadRequiredReplicaNum) {
+                        boolean hasFailedReplica = !tabletWriteFailedReplicas.isEmpty()
+                                || !tabletVersionFailedReplicas.isEmpty();
+                        if (hasFailedReplica) {
+                            String writeDetail = getTabletWriteDetail(tabletSuccReplicas,
+                                    tabletWriteFailedReplicas, tabletVersionFailedReplicas);
+                            logs.add(String.format("publish version quorum succ for transaction %s on tablet %s"
+                                    + " with version %s, and has failed replicas, load require replica num %s. "
+                                    + "table %s, partition %s, tablet detail: %s",
+                                    transactionState, tablet.getId(), newVersion,
+                                    loadRequiredReplicaNum, tableId, partitionId, writeDetail));
+                        }
+                        continue;
+                    }
+
+                    String writeDetail = getTabletWriteDetail(tabletSuccReplicas, tabletWriteFailedReplicas,
+                            tabletVersionFailedReplicas);
+                    if (allowPublishOneSucc && healthReplicaNum > 0) {
+                        if (publishResult == PublishResult.QUORUM_SUCC) {
+                            publishResult = PublishResult.TIMEOUT_SUCC;
+                        }
+                        // We can not do any thing except retrying,
+                        // because publish task is assigned a version,
+                        // and doris does not permit discontinuous
+                        // versions.
+                        //
+                        // If a timeout happens, it means that the rowset
+                        // that are being publised exists on a few replicas we should go
+                        // ahead, otherwise data may be lost and thre
+                        // publish task hangs forever.
+                        logs.add(String.format("publish version timeout succ for transaction %s on tablet %s "
+                                + "with version %s, and has failed replicas, load require replica num %s. "
+                                + "table %s, partition %s, tablet detail: %s",
+                                transactionState, tablet.getId(), newVersion,
+                                loadRequiredReplicaNum, tableId, partitionId, writeDetail));
+                    } else {
+                        publishResult = PublishResult.FAILED;
+                        String errMsg = String.format("publish on tablet %d failed."
+                                        + " succeed replica num %d < load required replica num %d."
+                                        + " table: %d, partition: %d, publish version: %d",
+                                tablet.getId(), healthReplicaNum, loadRequiredReplicaNum, tableId,
+                                partitionId, newVersion);
+                        transactionState.setErrorMsg(errMsg);
+                        logs.add(String.format("publish version failed for transaction %s on tablet %s with version"
+                                + " %s, and has failed replicas, load required replica num %s. table %s, "
+                                + "partition %s, tablet detail: %s",
+                                transactionState, tablet.getId(), newVersion,
+                                loadRequiredReplicaNum, tableId, partitionId, writeDetail));
+                    }
+                }
+            }
+        }
+
+        boolean needLog = publishResult != PublishResult.FAILED
+                || now - transactionState.getLastPublishLogTime() > Config.publish_fail_log_interval_second * 1000L;
+        if (needLog) {
+            transactionState.setLastPublishLogTime(now);
+            for (String log : logs) {
+                LOG.info("{}. publish times {}", log, transactionState.getPublishCount());
+            }
+        }
+
+        return publishResult;
     }
 
     private boolean isAlterReplicaLoadedTxn(long transactionId, OlapTable table) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
index 9e2054f..c4fc6de 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
@@ -229,6 +229,12 @@
 
     private long lastPublishVersionTime = -1;
 
+    private long publishCount = 0;
+
+    // txn may try finish many times and generate a lot of log.
+    // use lastPublishLogTime to reduce log.
+    private long lastPublishLogTime = 0;
+
     @SerializedName(value = "callbackId")
     private long callbackId = -1;
 
@@ -346,6 +352,7 @@
     }
 
     public void updateSendTaskTime() {
+        this.publishCount++;
         this.lastPublishVersionTime = System.currentTimeMillis();
         if (this.firstPublishVersionTime <= 0) {
             this.firstPublishVersionTime = lastPublishVersionTime;
@@ -360,6 +367,10 @@
         return this.lastPublishVersionTime;
     }
 
+    public long getPublishCount() {
+        return publishCount;
+    }
+
     public boolean hasSendTask() {
         return this.hasSendTask;
     }
@@ -428,6 +439,14 @@
         return errorLogUrl;
     }
 
+    public long getLastPublishLogTime() {
+        return lastPublishLogTime;
+    }
+
+    public void setLastPublishLogTime(long lastPublishLogTime) {
+        this.lastPublishLogTime = lastPublishLogTime;
+    }
+
     public void setTransactionStatus(TransactionStatus transactionStatus) {
         // status changed
         this.preStatus = this.transactionStatus;