| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package sql |
| |
| import ( |
| "context" |
| "flag" |
| "time" |
| |
| "seata.apache.org/seata-go/pkg/rm" |
| |
| "github.com/prometheus/client_golang/prometheus" |
| "github.com/prometheus/client_golang/prometheus/promauto" |
| |
| "seata.apache.org/seata-go/pkg/datasource/sql/datasource" |
| "seata.apache.org/seata-go/pkg/datasource/sql/undo" |
| "seata.apache.org/seata-go/pkg/protocol/branch" |
| "seata.apache.org/seata-go/pkg/util/fanout" |
| "seata.apache.org/seata-go/pkg/util/log" |
| ) |
| |
| type phaseTwoContext struct { |
| Xid string |
| BranchID int64 |
| ResourceID string |
| } |
| |
| type AsyncWorkerConfig struct { |
| BufferLimit int `yaml:"buffer_limit" json:"buffer_limit"` |
| BufferCleanInterval time.Duration `yaml:"buffer_clean_interval" json:"buffer_clean_interval"` |
| ReceiveChanSize int `yaml:"receive_chan_size" json:"receive_chan_size"` |
| CommitWorkerCount int `yaml:"commit_worker_count" json:"commit_worker_count"` |
| CommitWorkerBufferSize int `yaml:"commit_worker_buffer_size" json:"commit_worker_buffer_size"` |
| } |
| |
| func (cfg *AsyncWorkerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { |
| f.IntVar(&cfg.BufferLimit, prefix+".buffer_size", 10000, "async worker commit buffer limit.") |
| f.DurationVar(&cfg.BufferCleanInterval, prefix+".buffer.clean_interval", time.Second, "async worker commit buffer interval") |
| f.IntVar(&cfg.ReceiveChanSize, prefix+".channel_size", 10000, "async worker commit channel size") |
| f.IntVar(&cfg.CommitWorkerCount, prefix+".worker_count", 10, "async worker commit worker count") |
| f.IntVar(&cfg.CommitWorkerBufferSize, prefix+".worker_buffer_size", 1000, "async worker commit worker buffer size") |
| } |
| |
| // AsyncWorker executor for branch transaction commit and undo log |
| type AsyncWorker struct { |
| conf AsyncWorkerConfig |
| |
| commitQueue chan phaseTwoContext |
| resourceMgr datasource.DataSourceManager |
| commitWorker *fanout.Fanout |
| |
| branchCommitTotal prometheus.Counter |
| doBranchCommitFailureTotal prometheus.Counter |
| receiveChanLength prometheus.Gauge |
| rePutBackToQueue prometheus.Counter |
| } |
| |
| func NewAsyncWorker(prom prometheus.Registerer, conf AsyncWorkerConfig, sourceManager datasource.DataSourceManager) *AsyncWorker { |
| var asyncWorker AsyncWorker |
| asyncWorker.conf = conf |
| asyncWorker.commitQueue = make(chan phaseTwoContext, asyncWorker.conf.ReceiveChanSize) |
| asyncWorker.resourceMgr = sourceManager |
| asyncWorker.commitWorker = fanout.New("asyncWorker", |
| fanout.WithWorker(asyncWorker.conf.CommitWorkerCount), |
| fanout.WithBuffer(asyncWorker.conf.CommitWorkerBufferSize), |
| ) |
| |
| asyncWorker.branchCommitTotal = promauto.With(prom).NewCounter(prometheus.CounterOpts{ |
| Name: "async_worker_branch_commit_total", |
| Help: "the total count of branch commit total count", |
| }) |
| asyncWorker.doBranchCommitFailureTotal = promauto.With(prom).NewCounter(prometheus.CounterOpts{ |
| Name: "async_worker_branch_commit_failure_total", |
| Help: "the total count of branch commit failure count", |
| }) |
| asyncWorker.receiveChanLength = promauto.With(prom).NewGauge(prometheus.GaugeOpts{ |
| Name: "async_worker_receive_channel_length", |
| Help: "the current length of the receive channel size", |
| }) |
| asyncWorker.rePutBackToQueue = promauto.With(prom).NewCounter(prometheus.CounterOpts{ |
| Name: "async_worker_commit_failure_retry_counter", |
| Help: "the counter of commit failure retry counter", |
| }) |
| |
| go asyncWorker.run() |
| |
| return &asyncWorker |
| } |
| |
| // BranchCommit commit branch transaction |
| func (aw *AsyncWorker) BranchCommit(ctx context.Context, req rm.BranchResource) (branch.BranchStatus, error) { |
| phaseCtx := phaseTwoContext{ |
| Xid: req.Xid, |
| BranchID: req.BranchId, |
| ResourceID: req.ResourceId, |
| } |
| |
| aw.branchCommitTotal.Add(1) |
| |
| select { |
| case aw.commitQueue <- phaseCtx: |
| case <-ctx.Done(): |
| } |
| |
| aw.receiveChanLength.Add(float64(len(aw.commitQueue))) |
| |
| return branch.BranchStatusPhasetwoCommitted, nil |
| } |
| |
| func (aw *AsyncWorker) run() { |
| ticker := time.NewTicker(aw.conf.BufferCleanInterval) |
| phaseCtxs := make([]phaseTwoContext, 0, aw.conf.BufferLimit) |
| for { |
| select { |
| case phaseCtx := <-aw.commitQueue: |
| phaseCtxs = append(phaseCtxs, phaseCtx) |
| if len(phaseCtxs) >= aw.conf.BufferLimit*2/3 { |
| aw.doBranchCommit(&phaseCtxs) |
| } |
| case <-ticker.C: |
| aw.doBranchCommit(&phaseCtxs) |
| } |
| } |
| } |
| |
| func (aw *AsyncWorker) doBranchCommit(phaseCtxs *[]phaseTwoContext) { |
| if len(*phaseCtxs) == 0 { |
| return |
| } |
| |
| copyPhaseCtxs := make([]phaseTwoContext, len(*phaseCtxs)) |
| copy(copyPhaseCtxs, *phaseCtxs) |
| *phaseCtxs = (*phaseCtxs)[:0] |
| |
| doBranchCommit := func(ctx context.Context) { |
| groupCtxs := make(map[string][]phaseTwoContext, 16) |
| for i := range copyPhaseCtxs { |
| if copyPhaseCtxs[i].ResourceID == "" { |
| continue |
| } |
| |
| if _, ok := groupCtxs[copyPhaseCtxs[i].ResourceID]; !ok { |
| groupCtxs[copyPhaseCtxs[i].ResourceID] = make([]phaseTwoContext, 0, 4) |
| } |
| |
| ctxs := groupCtxs[copyPhaseCtxs[i].ResourceID] |
| ctxs = append(ctxs, copyPhaseCtxs[i]) |
| groupCtxs[copyPhaseCtxs[i].ResourceID] = ctxs |
| } |
| |
| for k := range groupCtxs { |
| aw.dealWithGroupedContexts(k, groupCtxs[k]) |
| } |
| } |
| |
| if err := aw.commitWorker.Do(context.Background(), doBranchCommit); err != nil { |
| aw.doBranchCommitFailureTotal.Add(1) |
| log.Errorf("do branch commit err:%v,phaseCtxs=%v", err, phaseCtxs) |
| } |
| } |
| |
| func (aw *AsyncWorker) dealWithGroupedContexts(resID string, phaseCtxs []phaseTwoContext) { |
| val, ok := aw.resourceMgr.GetCachedResources().Load(resID) |
| if !ok { |
| for i := range phaseCtxs { |
| aw.rePutBackToQueue.Add(1) |
| aw.commitQueue <- phaseCtxs[i] |
| } |
| return |
| } |
| |
| res := val.(*DBResource) |
| conn, err := res.db.Conn(context.Background()) |
| if err != nil { |
| for i := range phaseCtxs { |
| aw.commitQueue <- phaseCtxs[i] |
| } |
| } |
| |
| defer conn.Close() |
| |
| undoMgr, err := undo.GetUndoLogManager(res.dbType) |
| if err != nil { |
| for i := range phaseCtxs { |
| aw.rePutBackToQueue.Add(1) |
| aw.commitQueue <- phaseCtxs[i] |
| } |
| return |
| } |
| |
| for i := range phaseCtxs { |
| phaseCtx := phaseCtxs[i] |
| if err := undoMgr.BatchDeleteUndoLog([]string{phaseCtx.Xid}, []int64{phaseCtx.BranchID}, conn); err != nil { |
| aw.rePutBackToQueue.Add(1) |
| aw.commitQueue <- phaseCtx |
| } |
| } |
| } |