blob: 0fc5dcf055f8cbccff3b2fcd0b57d2408b80d94b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package transactions
import (
"container/list"
"context"
"io"
"os"
"os/signal"
"runtime"
"slices"
"sync"
"sync/atomic"
"syscall"
"time"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/apache/plc4x/plc4go/pkg/api/config"
"github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/pool"
"github.com/apache/plc4x/plc4go/spi/utils"
)
var sharedExecutorInstance pool.Executor // shared instance
func init() {
sharedExecutorInstance = pool.NewFixedSizeExecutor(
runtime.NumCPU(),
100,
options.WithExecutorOptionTracerWorkers(config.TraceTransactionManagerWorkers),
config.WithCustomLogger(zerolog.Nop()),
)
sharedExecutorInstance.Start()
go func() {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
<-c
sharedExecutorInstance.Stop()
}()
}
type RequestTransactionRunnable func(context.Context, RequestTransaction)
// RequestTransactionManager handles transactions
type RequestTransactionManager interface {
io.Closer
// CloseGraceful gives some time opposed to io.Closer
CloseGraceful(timeout time.Duration) error
// SetNumberOfConcurrentRequests sets the number of concurrent requests that will be sent out to a device
SetNumberOfConcurrentRequests(numberOfConcurrentRequests int)
// StartTransaction starts a RequestTransaction
StartTransaction() RequestTransaction
}
// NewRequestTransactionManager creates a new RequestTransactionManager
func NewRequestTransactionManager(numberOfConcurrentRequests int, _options ...options.WithOption) RequestTransactionManager {
extractTraceTransactionManagerTransactions, _ := options.ExtractTraceTransactionManagerTransactions(_options...)
customLogger := options.ExtractCustomLoggerOrDefaultToGlobal(_options...)
rtm := &requestTransactionManager{
numberOfConcurrentRequests: numberOfConcurrentRequests,
currentTransactionId: 0,
workLog: *list.New(),
executor: sharedExecutorInstance,
traceTransactionManagerTransactions: extractTraceTransactionManagerTransactions || config.TraceTransactionManagerTransactions,
log: customLogger,
}
rtm.ctx, rtm.cancelCtx = context.WithCancel(context.Background())
for _, option := range _options {
switch option := option.(type) {
case *withCustomExecutor:
rtm.executor = option.executor
}
}
return rtm
}
// WithCustomExecutor sets a custom Executor for the RequestTransactionManager
func WithCustomExecutor(executor pool.Executor) options.WithOption {
return &withCustomExecutor{executor: executor}
}
///////////////////////////////////////
///////////////////////////////////////
//
// Internal section
//
type withCustomExecutor struct {
options.Option
executor pool.Executor
}
//go:generate go tool plc4xGenerator -type=requestTransactionManager
type requestTransactionManager struct {
runningRequests []*requestTransaction
runningRequestMutex sync.RWMutex
numberOfConcurrentRequests int // How many transactions are allowed to run at the same time?
currentTransactionId int32 // Assigns each request a Unique Transaction Id, especially important for failure handling
transactionMutex sync.RWMutex
workLog list.List `ignore:"true"` // Important, this is a FIFO Queue for Fairness! // TODO: no support for list yet
workLogMutex sync.RWMutex
executor pool.Executor
shutdown atomic.Bool // Indicates it this rtm is in shutdown
ctx context.Context `ignore:"true"`
cancelCtx context.CancelFunc `ignore:"true"`
traceTransactionManagerTransactions bool // flag set to true if it should trace transactions
log zerolog.Logger
}
//
// Internal section
//
///////////////////////////////////////
///////////////////////////////////////
func (r *requestTransactionManager) SetNumberOfConcurrentRequests(numberOfConcurrentRequests int) {
r.log.Info().Int("numberOfConcurrentRequests", numberOfConcurrentRequests).Msg("Setting new number of concurrent requests")
// If we reduced the number of concurrent requests and more requests are in-flight
// than should be, at least log a warning.
r.runningRequestMutex.Lock()
runningRequestLength := len(r.runningRequests)
if numberOfConcurrentRequests < runningRequestLength {
r.log.Warn().Msg("The number of concurrent requests was reduced and currently more requests are in flight.")
}
r.numberOfConcurrentRequests = numberOfConcurrentRequests
r.runningRequestMutex.Unlock()
// As we might have increased the number, try to send some more requests.
r.processWorklog()
}
func (r *requestTransactionManager) submitTransaction(transaction *requestTransaction) {
// Add this Request with the transaction i the work log
// Put Transaction into work log
r.workLogMutex.Lock()
r.workLog.PushFront(transaction)
r.workLogMutex.Unlock()
// Try to Process the work log
r.processWorklog()
}
func (r *requestTransactionManager) processWorklog() {
r.workLogMutex.RLock()
defer r.workLogMutex.RUnlock()
r.runningRequestMutex.Lock()
defer r.runningRequestMutex.Unlock()
r.log.Debug().
Int("workLogLen", r.workLog.Len()).
Int("numberOfConcurrentRequests", r.numberOfConcurrentRequests).
Msg("Processing work log with size of workLogLen (numberOfConcurrentRequests concurrent requests allowed)")
for len(r.runningRequests) < r.numberOfConcurrentRequests && r.workLog.Len() > 0 {
front := r.workLog.Front()
next := front.Value.(*requestTransaction)
r.log.Debug().
Stringer("next", next).
Int("nRunningRequests", len(r.runningRequests)).
Msg("Handling next. (Adding to running requests (length: nRunningRequests))")
r.runningRequests = append(r.runningRequests, next)
completionFuture := r.executor.Submit(r.ctx, next.transactionId, next.operation)
next.setCompletionFuture(completionFuture)
r.workLog.Remove(front)
}
}
func (r *requestTransactionManager) StartTransaction() RequestTransaction {
r.transactionMutex.Lock()
defer r.transactionMutex.Unlock()
currentTransactionId := r.currentTransactionId
r.currentTransactionId += 1
transactionLogger := r.log
if !r.traceTransactionManagerTransactions {
transactionLogger = zerolog.Nop()
}
transaction := newRequestTransaction(transactionLogger, r, currentTransactionId)
if r.shutdown.Load() {
transaction.completed = true
transaction.setCompletionFuture(&completedFuture{errors.New("request transaction manager in shutdown")})
}
return transaction
}
func (r *requestTransactionManager) getNumberOfActiveRequests() int {
r.runningRequestMutex.RLock()
defer r.runningRequestMutex.RUnlock()
return len(r.runningRequests)
}
func (r *requestTransactionManager) failRequest(transaction *requestTransaction, err error) error {
// Try to fail it!
transaction.getCompletionFuture().Cancel(true, err)
// End it
return r.endRequest(transaction)
}
func (r *requestTransactionManager) endRequest(transaction *requestTransaction) error {
r.runningRequestMutex.Lock()
transaction.log.Debug().Msg("Trying to find a existing transaction")
found := false
r.runningRequests = slices.DeleteFunc(r.runningRequests, func(runningRequest *requestTransaction) bool {
if runningRequest.transactionId == transaction.transactionId {
transaction.log.Debug().Msg("Found a existing transaction")
found = true
return true
}
return false
})
if !found {
return errors.New("Unknown Transaction or Transaction already finished!")
}
transaction.log.Debug().Msg("Removing the existing transaction transaction")
r.runningRequestMutex.Unlock()
// Process the workLog, a slot should be free now
transaction.log.Debug().Msg("Processing the workLog")
r.processWorklog()
return nil
}
func (r *requestTransactionManager) Close() error {
defer utils.StopWarn(r.log)()
return r.CloseGraceful(0)
}
func (r *requestTransactionManager) CloseGraceful(timeout time.Duration) error {
r.log.Debug().Dur("timeout", timeout).Msg("closing with a timeout")
r.shutdown.Store(true)
if timeout > 0 {
timer := time.NewTimer(timeout)
gracefulLoop:
for {
r.runningRequestMutex.RLock()
numberRunningRequest := len(r.runningRequests)
r.runningRequestMutex.RUnlock()
if numberRunningRequest == 0 {
break gracefulLoop
}
select {
case <-timer.C:
r.log.Warn().Dur("timeout", timeout).Msg("timeout after")
break gracefulLoop
default:
time.Sleep(10 * time.Millisecond)
}
}
}
r.transactionMutex.Lock()
defer r.transactionMutex.Unlock()
r.workLogMutex.Lock()
defer r.workLogMutex.Unlock()
r.runningRequestMutex.Lock()
defer r.runningRequestMutex.Unlock()
r.runningRequests = nil
if r.executor != sharedExecutorInstance {
if err := r.executor.Close(); err != nil {
return errors.Wrap(err, "error closing executor")
}
} else {
r.log.Warn().Msg("not closing shared instance")
}
r.cancelCtx()
r.log.Debug().Msg("closed")
return nil
}