blob: a224f6e2b1b24036ae95ae73dfce7a8bf90d9b70 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package seata
import (
"bytes"
"context"
"fmt"
"io"
netHttp "net/http"
"strconv"
)
import (
"github.com/opentrx/seata-golang/v2/pkg/apis"
"github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/context/http"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/logger"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/server"
)
// handleHttp1GlobalBegin return bool, represent whether continue
func (f *Filter) handleHttp1GlobalBegin(ctx *http.HttpContext, transactionInfo *TransactionInfo) bool {
// todo support transaction isolation level
xid, err := f.globalBegin(ctx.Ctx, transactionInfo.RequestPath, transactionInfo.Timeout)
if err != nil {
logger.Errorf("failed to begin global transaction, transaction info: %v, err: %v",
transactionInfo, err)
ctx.SendLocalReply(netHttp.StatusInternalServerError, []byte(fmt.Sprintf("failed to begin global transaction, %v", err)))
return false
}
ctx.Params[XID] = xid
ctx.Request.Header.Add(XID, xid)
return true
}
func (f *Filter) handleHttp1GlobalEnd(ctx *http.HttpContext) {
xidParam := ctx.Params[XID]
xid := xidParam.(string)
response, ok := ctx.SourceResp.(*netHttp.Response)
if ok {
if response.StatusCode == netHttp.StatusOK {
err := f.globalCommit(ctx, xid)
if err != nil {
logger.Error(err)
}
} else {
err := f.globalRollback(ctx, xid)
if err != nil {
logger.Error(err)
}
}
} else {
err := f.globalRollback(ctx, xid)
if err != nil {
logger.Error(err)
}
}
}
// handleHttp1BranchRegister return bool, represent whether continue
func (f *Filter) handleHttp1BranchRegister(hctx *http.HttpContext, tccResource *TCCResource) bool {
xid := hctx.Request.Header.Get(XID)
if xid == "" {
logger.Error("failed to get xid from request header")
hctx.SendLocalReply(netHttp.StatusInternalServerError, []byte("failed to get xid from request header"))
return false
}
bodyBytes, err := io.ReadAll(hctx.Request.Body)
if err != nil {
logger.Error(err)
hctx.SendLocalReply(netHttp.StatusInternalServerError, []byte("failed to retrieve request body"))
return false
}
requestContext := &RequestContext{
ActionContext: make(map[string]string),
Headers: hctx.Request.Header.Clone(),
Body: bodyBytes,
Trailers: hctx.Request.Trailer.Clone(),
}
// Once read body, the body sawEOF will be true, then send request will have no body
hctx.Request.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
rEntry := hctx.GetRouteEntry()
if rEntry == nil {
panic("no route entry")
}
logger.Debugf("[dubbo-go-pixiu] client choose endpoint from cluster :%v", rEntry.Cluster)
clusterName := rEntry.Cluster
clusterManager := server.GetClusterManager()
endpoint := clusterManager.PickEndpoint(clusterName, hctx)
if endpoint == nil {
hctx.SendLocalReply(netHttp.StatusServiceUnavailable, []byte("cluster not found endpoint"))
return false
}
requestContext.ActionContext[VarHost] = endpoint.Address.GetAddress()
requestContext.ActionContext[CommitRequestPath] = tccResource.CommitRequestPath
requestContext.ActionContext[RollbackRequestPath] = tccResource.RollbackRequestPath
queryString := hctx.Request.URL.RawQuery
if queryString != "" {
requestContext.ActionContext[VarQueryString] = queryString
}
data, err := requestContext.Encode()
if err != nil {
logger.Errorf("encode request context failed, request context: %v, err: %v", requestContext, err)
hctx.SendLocalReply(netHttp.StatusInternalServerError, []byte(fmt.Sprintf("encode request context failed, %v", err)))
return false
}
ctx, cancel := context.WithTimeout(hctx.Ctx, hctx.Timeout)
defer cancel()
branchID, err := f.branchRegister(ctx, xid, tccResource.PrepareRequestPath, apis.TCC, data, "")
if err != nil {
logger.Errorf("branch transaction register failed, xid: %s, err: %v", xid, err)
hctx.SendLocalReply(netHttp.StatusInternalServerError, []byte(fmt.Sprintf("branch transaction register failed, %v", err)))
return false
}
hctx.Params[XID] = xid
hctx.Params[BranchID] = strconv.FormatInt(branchID, 10)
return true
}
func (f *Filter) handleHttp1BranchEnd(ctx *http.HttpContext) {
xidParam := ctx.Params[XID]
xid := xidParam.(string)
branchIDParam := ctx.Params[BranchID]
branchID, err := strconv.ParseInt(branchIDParam.(string), 10, 64)
if err != nil {
logger.Error(err)
}
response, ok := ctx.SourceResp.(*netHttp.Response)
if ok {
if response.StatusCode != netHttp.StatusOK {
err := f.branchReport(ctx.Ctx, xid, branchID, apis.TCC, apis.PhaseOneFailed, nil)
if err != nil {
logger.Error(err)
}
}
} else {
err := f.branchReport(ctx.Ctx, xid, branchID, apis.TCC, apis.PhaseOneFailed, nil)
if err != nil {
logger.Error(err)
}
}
}
func (f *Filter) globalBegin(ctx context.Context, name string, timeout int32) (string, error) {
request := &apis.GlobalBeginRequest{
Addressing: f.conf.Addressing,
Timeout: timeout,
TransactionName: name,
}
resp, err := f.transactionClient.Begin(ctx, request)
if err != nil {
return "", err
}
if resp.ResultCode == apis.ResultCodeSuccess {
return resp.XID, nil
}
return "", fmt.Errorf(resp.Message)
}
func (f *Filter) globalCommit(ctx *http.HttpContext, xid string) error {
var (
err error
status apis.GlobalSession_GlobalStatus
)
defer func() {
delete(ctx.Params, XID)
}()
retry := f.conf.CommitRetryCount
for retry > 0 {
status, err = f.commit(ctx.Ctx, xid)
if err != nil {
logger.Errorf("failed to report global commit [%s],Retry Countdown: %d, reason: %s",
xid, retry, err.Error())
} else {
break
}
retry--
if retry == 0 {
return errors.New("failed to report global commit")
}
}
logger.Infof("[%s] commit status: %s", xid, status.String())
return nil
}
func (f *Filter) globalRollback(ctx *http.HttpContext, xid string) error {
var (
err error
status apis.GlobalSession_GlobalStatus
)
defer func() {
delete(ctx.Params, XID)
}()
retry := f.conf.RollbackRetryCount
for retry > 0 {
status, err = f.rollback(ctx.Ctx, xid)
if err != nil {
logger.Errorf("failed to report global rollback [%s],Retry Countdown: %d, reason: %s",
xid, retry, err.Error())
} else {
break
}
retry--
if retry == 0 {
return errors.New("failed to report global rollback")
}
}
logger.Infof("[%s] rollback status: %s", xid, status.String())
return nil
}
func (f *Filter) commit(ctx context.Context, xid string) (apis.GlobalSession_GlobalStatus, error) {
request := &apis.GlobalCommitRequest{XID: xid}
resp, err := f.transactionClient.Commit(ctx, request)
if err != nil {
return 0, err
}
if resp.ResultCode == apis.ResultCodeSuccess {
return resp.GlobalStatus, nil
}
return 0, errors.New(resp.Message)
}
func (f *Filter) rollback(ctx context.Context, xid string) (apis.GlobalSession_GlobalStatus, error) {
request := &apis.GlobalRollbackRequest{XID: xid}
resp, err := f.transactionClient.Rollback(ctx, request)
if err != nil {
return 0, err
}
if resp.ResultCode == apis.ResultCodeSuccess {
return resp.GlobalStatus, nil
}
return 0, errors.New(resp.Message)
}
func (f *Filter) branchRegister(ctx context.Context, xid string, resourceID string,
branchType apis.BranchSession_BranchType, applicationData []byte, lockKeys string) (int64, error) {
request := &apis.BranchRegisterRequest{
Addressing: f.conf.Addressing,
XID: xid,
ResourceID: resourceID,
LockKey: lockKeys,
BranchType: branchType,
ApplicationData: applicationData,
}
resp, err := f.resourceClient.BranchRegister(ctx, request)
if err != nil {
return 0, err
}
if resp.ResultCode == apis.ResultCodeSuccess {
return resp.BranchID, nil
} else {
return 0, fmt.Errorf(resp.Message)
}
}
func (f *Filter) branchReport(ctx context.Context, xid string, branchID int64,
branchType apis.BranchSession_BranchType, status apis.BranchSession_BranchStatus, applicationData []byte) error {
request := &apis.BranchReportRequest{
XID: xid,
BranchID: branchID,
BranchType: branchType,
BranchStatus: status,
ApplicationData: applicationData,
}
resp, err := f.resourceClient.BranchReport(ctx, request)
if err != nil {
return err
}
if resp.ResultCode == apis.ResultCodeFailed {
return fmt.Errorf(resp.Message)
}
return nil
}