[improvement](transaction) reduce publish txn log (#28277)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index f0d1930..acb82ae 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -456,6 +456,10 @@
+ " dead lock" })
public static boolean publish_version_check_alter_replica = true;
+ @ConfField(mutable = true, masterOnly = true, description = {"单个事务 publish 失败打日志间隔",
+ "print log interval for publish transaction failed interval"})
+ public static long publish_fail_log_interval_second = 5 * 60;
+
@ConfField(mutable = true, masterOnly = true, description = {"提交事务的最大超时时间,单位是秒。"
+ "该参数仅用于事务型 insert 操作中。",
"Maximal waiting time for all data inserted before one transaction to be committed, in seconds. "
diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index 6ef7d3e..2947125 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -948,19 +948,8 @@
// add all commit errors and publish errors to a single set
Set<Long> errorReplicaIds = transactionState.getErrorReplicas();
- Map<Long, PublishVersionTask> publishTasks = transactionState.getPublishVersionTasks();
- long now = System.currentTimeMillis();
- long firstPublishVersionTime = transactionState.getFirstPublishVersionTime();
- boolean allowPublishOneSucc = false;
- if (Config.publish_wait_time_second > 0 && firstPublishVersionTime > 0
- && now >= firstPublishVersionTime + Config.publish_wait_time_second * 1000L) {
- allowPublishOneSucc = true;
- }
-
- List<Replica> tabletSuccReplicas = Lists.newArrayList();
- List<Replica> tabletWriteFailedReplicas = Lists.newArrayList();
- List<Replica> tabletVersionFailedReplicas = Lists.newArrayList();
+ List<Pair<OlapTable, Partition>> relatedTblPartitions = Lists.newArrayList();
// case 1 If database is dropped, then we just throw MetaNotFoundException, because all related tables are
// already force dropped, we just ignore the transaction with all tables been force dropped.
@@ -975,134 +964,12 @@
LOG.debug("finish transaction {} with tables {}", transactionId, tableIdList);
List<? extends TableIf> tableList = db.getTablesOnIdOrderIfExist(tableIdList);
tableList = MetaLockUtils.writeLockTablesIfExist(tableList);
- PublishResult publishResult = PublishResult.QUORUM_SUCC;
+ PublishResult publishResult;
try {
- Iterator<TableCommitInfo> tableCommitInfoIterator
- = transactionState.getIdToTableCommitInfos().values().iterator();
- while (tableCommitInfoIterator.hasNext()) {
- TableCommitInfo tableCommitInfo = tableCommitInfoIterator.next();
- long tableId = tableCommitInfo.getTableId();
- OlapTable table = (OlapTable) db.getTableNullable(tableId);
- // table maybe dropped between commit and publish, ignore this error
- if (table == null) {
- tableCommitInfoIterator.remove();
- LOG.warn("table {} is dropped, skip version check and remove it from transaction state {}",
- tableId,
- transactionState);
- continue;
- }
-
- boolean alterReplicaLoadedTxn = isAlterReplicaLoadedTxn(transactionId, table);
- Iterator<PartitionCommitInfo> partitionCommitInfoIterator
- = tableCommitInfo.getIdToPartitionCommitInfo().values().iterator();
- while (partitionCommitInfoIterator.hasNext()) {
- PartitionCommitInfo partitionCommitInfo = partitionCommitInfoIterator.next();
- long partitionId = partitionCommitInfo.getPartitionId();
- Partition partition = table.getPartition(partitionId);
- // partition maybe dropped between commit and publish version, ignore this error
- if (partition == null) {
- partitionCommitInfoIterator.remove();
- LOG.warn("partition {} is dropped, skip version check"
- + " and remove it from transaction state {}", partitionId, transactionState);
- continue;
- }
- if (partition.getVisibleVersion() != partitionCommitInfo.getVersion() - 1) {
- LOG.debug("transactionId {} partition commitInfo version {} is not equal with "
- + "partition visible version {} plus one, need wait",
- transactionId,
- partitionCommitInfo.getVersion(),
- partition.getVisibleVersion());
- String errMsg = String.format("wait for publishing partition %d version %d."
- + " self version: %d. table %d", partitionId, partition.getVisibleVersion() + 1,
- partitionCommitInfo.getVersion(), tableId);
- transactionState.setErrorMsg(errMsg);
- return;
- }
-
- int loadRequiredReplicaNum = table.getLoadRequiredReplicaNum(partitionId);
- List<MaterializedIndex> allIndices;
- if (transactionState.getLoadedTblIndexes().isEmpty()) {
- allIndices = partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL);
- } else {
- allIndices = Lists.newArrayList();
- for (long indexId : transactionState.getLoadedTblIndexes().get(tableId)) {
- MaterializedIndex index = partition.getIndex(indexId);
- if (index != null) {
- allIndices.add(index);
- }
- }
- }
-
- // check success replica number for each tablet.
- // a success replica means:
- // 1. Not in errorReplicaIds: succeed in both commit and publish phase
- // 2. last failed version < 0: is a health replica before
- // 3. version catch up: not with a stale version
- // Here we only check number, the replica version will be updated in updateCatalogAfterVisible()
- for (MaterializedIndex index : allIndices) {
- for (Tablet tablet : index.getTablets()) {
- tabletSuccReplicas.clear();
- tabletWriteFailedReplicas.clear();
- tabletVersionFailedReplicas.clear();
- for (Replica replica : tablet.getReplicas()) {
- checkReplicaContinuousVersionSucc(tablet.getId(), replica, alterReplicaLoadedTxn,
- partitionCommitInfo.getVersion(), publishTasks.get(replica.getBackendId()),
- errorReplicaIds, tabletSuccReplicas, tabletWriteFailedReplicas,
- tabletVersionFailedReplicas);
- }
-
- int healthReplicaNum = tabletSuccReplicas.size();
- if (healthReplicaNum >= loadRequiredReplicaNum) {
- if (!tabletWriteFailedReplicas.isEmpty() || !tabletVersionFailedReplicas.isEmpty()) {
- String writeDetail = getTabletWriteDetail(tabletSuccReplicas,
- tabletWriteFailedReplicas, tabletVersionFailedReplicas);
- LOG.info("publish version quorum succ for transaction {} on tablet {} with version"
- + " {}, and has failed replicas, load require replica num {}. table {}, "
- + "partition {}, tablet detail: {}",
- transactionState, tablet.getId(), partitionCommitInfo.getVersion(),
- loadRequiredReplicaNum, tableId, partitionId, writeDetail);
- }
- continue;
- }
-
- String writeDetail = getTabletWriteDetail(tabletSuccReplicas, tabletWriteFailedReplicas,
- tabletVersionFailedReplicas);
- if (allowPublishOneSucc && healthReplicaNum > 0) {
- if (publishResult == PublishResult.QUORUM_SUCC) {
- publishResult = PublishResult.TIMEOUT_SUCC;
- }
- // We can not do any thing except retrying,
- // because publish task is assigned a version,
- // and doris does not permit discontinuous
- // versions.
- //
- // If a timeout happens, it means that the rowset
- // that are being publised exists on a few replicas we should go
- // ahead, otherwise data may be lost and thre
- // publish task hangs forever.
- LOG.info("publish version timeout succ for transaction {} on tablet {} with version"
- + " {}, and has failed replicas, load require replica num {}. table {}, "
- + "partition {}, tablet detail: {}",
- transactionState, tablet.getId(), partitionCommitInfo.getVersion(),
- loadRequiredReplicaNum, tableId, partitionId, writeDetail);
- } else {
- publishResult = PublishResult.FAILED;
- String errMsg = String.format("publish on tablet %d failed."
- + " succeed replica num %d < load required replica num %d."
- + " table: %d, partition: %d, publish version: %d",
- tablet.getId(), healthReplicaNum, loadRequiredReplicaNum, tableId,
- partitionId, partition.getVisibleVersion() + 1);
- transactionState.setErrorMsg(errMsg);
- LOG.info("publish version failed for transaction {} on tablet {} with version"
- + " {}, and has failed replicas, load required replica num {}. table {}, "
- + "partition {}, tablet detail: {}",
- transactionState, tablet.getId(), partitionCommitInfo.getVersion(),
- loadRequiredReplicaNum, tableId, partitionId, writeDetail);
- }
- }
- }
- }
+ if (!finishCheckPartitionVersion(transactionState, db, relatedTblPartitions)) {
+ return;
}
+ publishResult = finishCheckQuorumReplicas(transactionState, relatedTblPartitions, errorReplicaIds);
if (publishResult == PublishResult.FAILED) {
return;
}
@@ -1137,7 +1004,181 @@
// Otherwise, there is no way for stream load to query the result right after loading finished,
// even if we call "sync" before querying.
transactionState.countdownVisibleLatch();
- LOG.info("finish transaction {} successfully, publish result: {}", transactionState, publishResult.name());
+ LOG.info("finish transaction {} successfully, publish times {}, publish result {}",
+ transactionState, transactionState.getPublishCount(), publishResult.name());
+ }
+
+ private boolean finishCheckPartitionVersion(TransactionState transactionState, Database db,
+ List<Pair<OlapTable, Partition>> relatedTblPartitions) {
+ Iterator<TableCommitInfo> tableCommitInfoIterator
+ = transactionState.getIdToTableCommitInfos().values().iterator();
+ while (tableCommitInfoIterator.hasNext()) {
+ TableCommitInfo tableCommitInfo = tableCommitInfoIterator.next();
+ long tableId = tableCommitInfo.getTableId();
+ OlapTable table = (OlapTable) db.getTableNullable(tableId);
+ // table maybe dropped between commit and publish, ignore this error
+ if (table == null) {
+ tableCommitInfoIterator.remove();
+ LOG.warn("table {} is dropped, skip version check and remove it from transaction state {}",
+ tableId,
+ transactionState);
+ continue;
+ }
+
+ Iterator<PartitionCommitInfo> partitionCommitInfoIterator
+ = tableCommitInfo.getIdToPartitionCommitInfo().values().iterator();
+ while (partitionCommitInfoIterator.hasNext()) {
+ PartitionCommitInfo partitionCommitInfo = partitionCommitInfoIterator.next();
+ long partitionId = partitionCommitInfo.getPartitionId();
+ Partition partition = table.getPartition(partitionId);
+ // partition maybe dropped between commit and publish version, ignore this error
+ if (partition == null) {
+ partitionCommitInfoIterator.remove();
+ LOG.warn("partition {} is dropped, skip version check"
+ + " and remove it from transaction state {}", partitionId, transactionState);
+ continue;
+ }
+ if (partition.getVisibleVersion() != partitionCommitInfo.getVersion() - 1) {
+ LOG.debug("for table {} partition {}, transactionId {} partition commitInfo version {} is not"
+ + " equal with partition visible version {} plus one, need wait",
+ table.getId(), partition.getId(), transactionState.getTransactionId(),
+ partitionCommitInfo.getVersion(), partition.getVisibleVersion());
+ String errMsg = String.format("wait for publishing partition %d version %d."
+ + " self version: %d. table %d", partitionId, partition.getVisibleVersion() + 1,
+ partitionCommitInfo.getVersion(), tableId);
+ transactionState.setErrorMsg(errMsg);
+ return false;
+ }
+
+ relatedTblPartitions.add(Pair.of(table, partition));
+ }
+ }
+
+ return true;
+ }
+
+ private PublishResult finishCheckQuorumReplicas(TransactionState transactionState,
+ List<Pair<OlapTable, Partition>> relatedTblPartitions,
+ Set<Long> errorReplicaIds) {
+ long now = System.currentTimeMillis();
+ long firstPublishVersionTime = transactionState.getFirstPublishVersionTime();
+ boolean allowPublishOneSucc = false;
+ if (Config.publish_wait_time_second > 0 && firstPublishVersionTime > 0
+ && now >= firstPublishVersionTime + Config.publish_wait_time_second * 1000L) {
+ allowPublishOneSucc = true;
+ }
+
+ List<Replica> tabletSuccReplicas = Lists.newArrayList();
+ List<Replica> tabletWriteFailedReplicas = Lists.newArrayList();
+ List<Replica> tabletVersionFailedReplicas = Lists.newArrayList();
+ List<String> logs = Lists.newArrayList();
+
+ Map<Long, PublishVersionTask> publishTasks = transactionState.getPublishVersionTasks();
+ PublishResult publishResult = PublishResult.QUORUM_SUCC;
+ for (Pair<OlapTable, Partition> pair : relatedTblPartitions) {
+ OlapTable table = pair.key();
+ Partition partition = pair.value();
+ long tableId = table.getId();
+ long partitionId = partition.getId();
+ long newVersion = partition.getVisibleVersion() + 1;
+ int loadRequiredReplicaNum = table.getLoadRequiredReplicaNum(partitionId);
+ List<MaterializedIndex> allIndices;
+ if (transactionState.getLoadedTblIndexes().isEmpty()) {
+ allIndices = partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL);
+ } else {
+ allIndices = Lists.newArrayList();
+ for (long indexId : transactionState.getLoadedTblIndexes().get(tableId)) {
+ MaterializedIndex index = partition.getIndex(indexId);
+ if (index != null) {
+ allIndices.add(index);
+ }
+ }
+ }
+
+ boolean alterReplicaLoadedTxn = isAlterReplicaLoadedTxn(transactionState.getTransactionId(), table);
+
+ // check success replica number for each tablet.
+ // a success replica means:
+ // 1. Not in errorReplicaIds: succeed in both commit and publish phase
+ // 2. last failed version < 0: is a health replica before
+ // 3. version catch up: not with a stale version
+ // Here we only check number, the replica version will be updated in updateCatalogAfterVisible()
+ for (MaterializedIndex index : allIndices) {
+ for (Tablet tablet : index.getTablets()) {
+ tabletSuccReplicas.clear();
+ tabletWriteFailedReplicas.clear();
+ tabletVersionFailedReplicas.clear();
+ for (Replica replica : tablet.getReplicas()) {
+ checkReplicaContinuousVersionSucc(tablet.getId(), replica, alterReplicaLoadedTxn,
+ newVersion, publishTasks.get(replica.getBackendId()),
+ errorReplicaIds, tabletSuccReplicas, tabletWriteFailedReplicas,
+ tabletVersionFailedReplicas);
+ }
+
+ int healthReplicaNum = tabletSuccReplicas.size();
+ if (healthReplicaNum >= loadRequiredReplicaNum) {
+ boolean hasFailedReplica = !tabletWriteFailedReplicas.isEmpty()
+ || !tabletVersionFailedReplicas.isEmpty();
+ if (hasFailedReplica) {
+ String writeDetail = getTabletWriteDetail(tabletSuccReplicas,
+ tabletWriteFailedReplicas, tabletVersionFailedReplicas);
+ logs.add(String.format("publish version quorum succ for transaction %s on tablet %s"
+ + " with version %s, and has failed replicas, load require replica num %s. "
+ + "table %s, partition %s, tablet detail: %s",
+ transactionState, tablet.getId(), newVersion,
+ loadRequiredReplicaNum, tableId, partitionId, writeDetail));
+ }
+ continue;
+ }
+
+ String writeDetail = getTabletWriteDetail(tabletSuccReplicas, tabletWriteFailedReplicas,
+ tabletVersionFailedReplicas);
+ if (allowPublishOneSucc && healthReplicaNum > 0) {
+ if (publishResult == PublishResult.QUORUM_SUCC) {
+ publishResult = PublishResult.TIMEOUT_SUCC;
+ }
+ // We can not do any thing except retrying,
+ // because publish task is assigned a version,
+ // and doris does not permit discontinuous
+ // versions.
+ //
+ // If a timeout happens, it means that the rowset
+ // that are being publised exists on a few replicas we should go
+ // ahead, otherwise data may be lost and thre
+ // publish task hangs forever.
+ logs.add(String.format("publish version timeout succ for transaction %s on tablet %s "
+ + "with version %s, and has failed replicas, load require replica num %s. "
+ + "table %s, partition %s, tablet detail: %s",
+ transactionState, tablet.getId(), newVersion,
+ loadRequiredReplicaNum, tableId, partitionId, writeDetail));
+ } else {
+ publishResult = PublishResult.FAILED;
+ String errMsg = String.format("publish on tablet %d failed."
+ + " succeed replica num %d < load required replica num %d."
+ + " table: %d, partition: %d, publish version: %d",
+ tablet.getId(), healthReplicaNum, loadRequiredReplicaNum, tableId,
+ partitionId, newVersion);
+ transactionState.setErrorMsg(errMsg);
+ logs.add(String.format("publish version failed for transaction %s on tablet %s with version"
+ + " %s, and has failed replicas, load required replica num %s. table %s, "
+ + "partition %s, tablet detail: %s",
+ transactionState, tablet.getId(), newVersion,
+ loadRequiredReplicaNum, tableId, partitionId, writeDetail));
+ }
+ }
+ }
+ }
+
+ boolean needLog = publishResult != PublishResult.FAILED
+ || now - transactionState.getLastPublishLogTime() > Config.publish_fail_log_interval_second * 1000L;
+ if (needLog) {
+ transactionState.setLastPublishLogTime(now);
+ for (String log : logs) {
+ LOG.info("{}. publish times {}", log, transactionState.getPublishCount());
+ }
+ }
+
+ return publishResult;
}
private boolean isAlterReplicaLoadedTxn(long transactionId, OlapTable table) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
index 9e2054f..c4fc6de 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
@@ -229,6 +229,12 @@
private long lastPublishVersionTime = -1;
+ private long publishCount = 0;
+
+ // txn may try finish many times and generate a lot of log.
+ // use lastPublishLogTime to reduce log.
+ private long lastPublishLogTime = 0;
+
@SerializedName(value = "callbackId")
private long callbackId = -1;
@@ -346,6 +352,7 @@
}
public void updateSendTaskTime() {
+ this.publishCount++;
this.lastPublishVersionTime = System.currentTimeMillis();
if (this.firstPublishVersionTime <= 0) {
this.firstPublishVersionTime = lastPublishVersionTime;
@@ -360,6 +367,10 @@
return this.lastPublishVersionTime;
}
+ public long getPublishCount() {
+ return publishCount;
+ }
+
public boolean hasSendTask() {
return this.hasSendTask;
}
@@ -428,6 +439,14 @@
return errorLogUrl;
}
+ public long getLastPublishLogTime() {
+ return lastPublishLogTime;
+ }
+
+ public void setLastPublishLogTime(long lastPublishLogTime) {
+ this.lastPublishLogTime = lastPublishLogTime;
+ }
+
public void setTransactionStatus(TransactionStatus transactionStatus) {
// status changed
this.preStatus = this.transactionStatus;