blob: 5a816f54f597e0c6a0c89b40e67a57b4a375de7d [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package srvhelper
import (
"fmt"
"time"
"github.com/apache/incubator-devlake/core/context"
"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/log"
"github.com/apache/incubator-devlake/core/models"
"github.com/apache/incubator-devlake/helpers/dbhelper"
"github.com/go-playground/validator/v10"
)
type CustomValidator interface {
CustomValidate(entity interface{}, validate *validator.Validate) errors.Error
}
type ModelInfo interface {
New() any
NewSlice() any
ModelName() string
TableName() string
}
type AnyModelSrvHelper struct {
ModelInfo
basicRes context.BasicRes
log log.Logger
db dal.Dal
validator *validator.Validate
pk []dal.ColumnMeta
pkWhere string
pkCount int
searchColumns []string
}
func NewAnyModelSrvHelper(basicRes context.BasicRes, modelInfo ModelInfo, searchColumns []string) *AnyModelSrvHelper {
db := basicRes.GetDal()
if db == nil {
db = basicRes.GetDal()
}
pk := errors.Must1(dal.GetPrimarykeyColumns(db, modelInfo.TableName()))
pkWhere := ""
for _, col := range pk {
if pkWhere != "" {
pkWhere += " AND "
}
pkWhere += fmt.Sprintf("%s = ? ", col.Name())
}
return &AnyModelSrvHelper{
ModelInfo: modelInfo,
basicRes: basicRes,
log: basicRes.GetLogger().Nested(fmt.Sprintf("%s_dal", modelInfo.ModelName())),
db: db,
validator: validator.New(),
pk: pk,
pkWhere: pkWhere,
pkCount: len(pk),
searchColumns: searchColumns,
}
}
func (srv *AnyModelSrvHelper) ValidateModel(model any) errors.Error {
// the model can validate itself
if customValidator, ok := (interface{}(model)).(CustomValidator); ok {
return customValidator.CustomValidate(model, srv.validator)
}
// basic validator
if e := srv.validator.Struct(model); e != nil {
return errors.BadInput.Wrap(e, "validation faild")
}
return nil
}
// Create validates given model and insert it into database if validation passed
func (srv *AnyModelSrvHelper) CreateAny(model any) errors.Error {
err := srv.ValidateModel(model)
if err != nil {
return err
}
err = srv.db.Create(model)
if err != nil {
if srv.db.IsDuplicationError(err) {
return errors.Default.New("The name of the current scope config is duplicated. Please modify it before saving.")
}
return err
}
return err
}
// Update validates given model and update it into database if validation passed
func (srv *AnyModelSrvHelper) UpdateAny(model any) errors.Error {
err := srv.ValidateModel(model)
if err != nil {
if srv.db.IsDuplicationError(err) {
return errors.Default.New("The name of the current scope config is duplicated. Please modify it before saving.")
}
return err
}
return srv.db.Update(model, dal.From(srv.ModelInfo.TableName()))
}
// CreateOrUpdate validates given model and insert or update it into database if validation passed
func (srv *AnyModelSrvHelper) CreateOrUpdateAny(model any) errors.Error {
err := srv.ValidateModel(model)
if err != nil {
return err
}
return srv.db.CreateOrUpdate(model)
}
// DeleteModel deletes given model from database
func (srv *AnyModelSrvHelper) DeleteModelAny(model any) errors.Error {
return srv.db.Delete(model)
}
// FindByPk returns model with given primary key from database
func (srv *AnyModelSrvHelper) FindByPkAny(pk ...interface{}) (any, errors.Error) {
if len(pk) != srv.pkCount {
return nil, errors.BadInput.New("invalid primary key")
}
model := srv.New()
err := srv.db.First(model, dal.From(srv.TableName()), dal.Where(srv.pkWhere, pk...))
if err != nil {
if srv.db.IsErrorNotFound(err) {
return nil, errors.NotFound.Wrap(err, fmt.Sprintf("%s not found", srv.ModelName()))
}
return nil, err
}
return model, nil
}
// GetAll returns all models from database
func (srv *AnyModelSrvHelper) QueryAllAny() (any, errors.Error) {
array := srv.NewSlice()
return array, srv.db.All(&array, dal.From(srv.ModelInfo.TableName()))
}
func (srv *AnyModelSrvHelper) QueryPageAny(pagination *Pagination, query ...dal.Clause) (any, int64, errors.Error) {
query = append(query, dal.From(srv.TableName()))
// process keyword
searchTerm := pagination.SearchTerm
if searchTerm != "" && len(srv.searchColumns) > 0 {
sql := ""
value := "%" + searchTerm + "%"
values := make([]interface{}, len(srv.searchColumns))
for i, field := range srv.searchColumns {
if sql != "" {
sql += " OR "
}
sql += fmt.Sprintf("%s LIKE ?", field)
values[i] = value
}
sql = fmt.Sprintf("(%s)", sql)
query = append(query,
dal.Where(sql, values...),
)
}
count, err := srv.db.Count(query...)
if err != nil {
return nil, 0, err
}
query = append(query, dal.Limit(pagination.GetLimit()), dal.Offset(pagination.GetOffset()))
array := srv.NewSlice()
return array, count, srv.db.All(&array, query...)
}
func (srv *AnyModelSrvHelper) NoRunningPipeline(fn func(tx dal.Transaction) errors.Error, tablesToLock ...*dal.LockTable) (err errors.Error) {
// make sure no pipeline is running
tablesToLock = append(tablesToLock, &dal.LockTable{Table: "_devlake_pipelines", Exclusive: true})
txHelper := dbhelper.NewTxHelper(srv.basicRes, &err)
defer txHelper.End()
tx := txHelper.Begin()
err = txHelper.LockTablesTimeout(2*time.Second, tablesToLock)
if err != nil {
err = errors.Conflict.Wrap(err, "lock pipelines table timedout")
return
}
count := errors.Must1(tx.Count(
dal.From("_devlake_pipelines"),
dal.Where("status = ?", models.TASK_RUNNING),
))
if count > 0 {
err = errors.Conflict.New("at least one pipeline is running")
return
}
// time.Sleep(1 * time.Minute) # uncomment this line if you were to verify pipelines get blocked while deleting data
// creating a nested transaction to avoid mysql complaining about table(s) NOT being locked
nextedTxHelper := dbhelper.NewTxHelper(srv.basicRes, &err)
defer nextedTxHelper.End()
nestedTX := nextedTxHelper.Begin()
err = fn(nestedTX)
return
}