blob: aec8a22ccc399bdf7819cdd26b912ebd07270755 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package services
import (
"encoding/json"
"fmt"
"strings"
"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/models"
"github.com/apache/incubator-devlake/core/plugin"
helper "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
"github.com/apache/incubator-devlake/impls/logruslog"
"github.com/robfig/cron/v3"
)
// BlueprintQuery is a query for GetBlueprints
type BlueprintQuery struct {
Pagination
Enable *bool `form:"enable,omitempty"`
IsManual *bool `form:"isManual"`
Label string `form:"label"`
}
var (
blueprintLog = logruslog.Global.Nested("blueprint")
)
type BlueprintJob struct {
Blueprint *models.Blueprint
}
func (bj BlueprintJob) Run() {
blueprint := bj.Blueprint
pipeline, err := createPipelineByBlueprint(blueprint)
if err != nil {
blueprintLog.Error(err, fmt.Sprintf("run cron job failed on blueprint:[%d][%s]", blueprint.ID, blueprint.Name))
} else {
blueprintLog.Info("Run new cron job successfully,blueprint id:[%d] pipeline id:[%d]", blueprint.ID, pipeline.ID)
}
}
// CreateBlueprint accepts a Blueprint instance and insert it to database
func CreateBlueprint(blueprint *models.Blueprint) errors.Error {
err := validateBlueprintAndMakePlan(blueprint)
if err != nil {
return err
}
err = SaveDbBlueprint(blueprint)
if err != nil {
return err
}
err = ReloadBlueprints(cronManager)
if err != nil {
return errors.Internal.Wrap(err, "error reloading blueprints")
}
return nil
}
// GetBlueprints returns a paginated list of Blueprints based on `query`
func GetBlueprints(query *BlueprintQuery) ([]*models.Blueprint, int64, errors.Error) {
blueprints, count, err := GetDbBlueprints(query)
if err != nil {
return nil, 0, errors.Convert(err)
}
return blueprints, count, nil
}
// GetBlueprint returns the detail of a given Blueprint ID
func GetBlueprint(blueprintId uint64) (*models.Blueprint, errors.Error) {
blueprint, err := GetDbBlueprint(blueprintId)
if err != nil {
if db.IsErrorNotFound(err) {
return nil, errors.NotFound.New("blueprint not found")
}
return nil, errors.Internal.Wrap(err, "error getting the blueprint from database")
}
return blueprint, nil
}
// GetBlueprintByProjectName returns the detail of a given ProjectName
func GetBlueprintByProjectName(projectName string) (*models.Blueprint, errors.Error) {
if projectName == "" {
return nil, errors.Internal.New("can not use the empty projectName to search the unique blueprint")
}
blueprint, err := GetDbBlueprintByProjectName(projectName)
if err != nil {
// Allow specific projectName to fail to find the corresponding blueprint
if db.IsErrorNotFound(err) {
return nil, nil
}
return nil, errors.Internal.Wrap(err, fmt.Sprintf("error getting the blueprint from database with project %s", projectName))
}
return blueprint, nil
}
func validateBlueprintAndMakePlan(blueprint *models.Blueprint) errors.Error {
if len(blueprint.Settings) == 0 {
blueprint.Settings = nil
}
// validation
err := vld.Struct(blueprint)
if err != nil {
return errors.BadInput.WrapRaw(err)
}
// checking if the project exist
if blueprint.ProjectName != "" {
_, err := GetProject(blueprint.ProjectName)
if err != nil {
return errors.Default.Wrap(err, fmt.Sprintf("invalid projectName: [%s] for the blueprint [%s]", blueprint.ProjectName, blueprint.Name))
}
bp, err := GetBlueprintByProjectName(blueprint.ProjectName)
if err != nil {
return err
}
if bp != nil {
if bp.ID != blueprint.ID {
return errors.Default.New(fmt.Sprintf("Each project can only be used by one blueprint. The currently selected projectName: [%s] has been used by blueprint: [id:%d] [name:%s] and cannot be reused.", bp.ProjectName, bp.ID, bp.Name))
}
}
}
if strings.ToLower(blueprint.CronConfig) == "manual" {
blueprint.IsManual = true
}
if !blueprint.IsManual {
_, err = cron.ParseStandard(blueprint.CronConfig)
if err != nil {
return errors.Default.Wrap(err, "invalid cronConfig")
}
}
if blueprint.Mode == models.BLUEPRINT_MODE_ADVANCED {
plan := make(plugin.PipelinePlan, 0)
err = errors.Convert(json.Unmarshal(blueprint.Plan, &plan))
if err != nil {
return errors.Default.Wrap(err, "invalid plan")
}
// tasks should not be empty
if len(plan) == 0 || len(plan[0]) == 0 {
return errors.Default.New("empty plan")
}
} else if blueprint.Mode == models.BLUEPRINT_MODE_NORMAL {
plan, err := MakePlanForBlueprint(blueprint)
if err != nil {
return errors.Default.Wrap(err, "make plan for blueprint failed")
}
blueprint.Plan, err = errors.Convert01(json.Marshal(plan))
if err != nil {
return errors.Default.Wrap(err, "failed to markshal plan")
}
}
return nil
}
func saveBlueprint(blueprint *models.Blueprint) (*models.Blueprint, errors.Error) {
// validation
err := validateBlueprintAndMakePlan(blueprint)
if err != nil {
return nil, errors.BadInput.WrapRaw(err)
}
err = SaveDbBlueprint(blueprint)
if err != nil {
return nil, err
}
// reload schedule
err = ReloadBlueprints(cronManager)
if err != nil {
return nil, errors.Internal.Wrap(err, "error reloading blueprints")
}
// done
return blueprint, nil
}
// PatchBlueprint FIXME ...
func PatchBlueprint(id uint64, body map[string]interface{}) (*models.Blueprint, errors.Error) {
// load record from db
blueprint, err := GetBlueprint(id)
if err != nil {
return nil, err
}
originMode := blueprint.Mode
err = helper.DecodeMapStruct(body, blueprint, true)
if err != nil {
return nil, err
}
// make sure mode is not being update
if originMode != blueprint.Mode {
return nil, errors.Default.New("mode is not updatable")
}
blueprint, err = saveBlueprint(blueprint)
if err != nil {
return nil, err
}
return blueprint, nil
}
// ReloadBlueprints FIXME ...
func ReloadBlueprints(c *cron.Cron) errors.Error {
enable := true
isManual := false
blueprints, _, err := GetDbBlueprints(&BlueprintQuery{Enable: &enable, IsManual: &isManual})
if err != nil {
return err
}
for _, e := range c.Entries() {
c.Remove(e.ID)
}
c.Stop()
for _, blueprint := range blueprints {
if err != nil {
blueprintLog.Error(err, failToCreateCronJob)
return err
}
blueprintLog.Info("Add blueprint id:[%d] cronConfg[%s] to cron job", blueprint.ID, blueprint.CronConfig)
blueprintJob := &BlueprintJob{
Blueprint: blueprint,
}
if _, err := c.AddJob(blueprint.CronConfig, blueprintJob); err != nil {
blueprintLog.Error(err, failToCreateCronJob)
return errors.Default.Wrap(err, "created cron job failed")
}
}
if len(blueprints) > 0 {
c.Start()
}
logger.Info("total %d blueprints were scheduled", len(blueprints))
return nil
}
func createPipelineByBlueprint(blueprint *models.Blueprint) (*models.Pipeline, errors.Error) {
var plan plugin.PipelinePlan
var err errors.Error
if blueprint.Mode == models.BLUEPRINT_MODE_NORMAL {
plan, err = MakePlanForBlueprint(blueprint)
} else {
plan, err = blueprint.UnmarshalPlan()
}
if err != nil {
blueprintLog.Error(err, fmt.Sprintf("failed to MakePlanForBlueprint on blueprint:[%d][%s]", blueprint.ID, blueprint.Name))
return nil, err
}
newPipeline := models.NewPipeline{}
newPipeline.Plan = plan
newPipeline.Name = blueprint.Name
newPipeline.BlueprintId = blueprint.ID
newPipeline.Labels = blueprint.Labels
newPipeline.SkipOnFail = blueprint.SkipOnFail
pipeline, err := CreatePipeline(&newPipeline)
// Return all created tasks to the User
if err != nil {
blueprintLog.Error(err, fmt.Sprintf("%s on blueprint:[%d][%s]", failToCreateCronJob, blueprint.ID, blueprint.Name))
return nil, errors.Convert(err)
}
return pipeline, nil
}
// MakePlanForBlueprint generates pipeline plan by version
func MakePlanForBlueprint(blueprint *models.Blueprint) (plugin.PipelinePlan, errors.Error) {
bpSettings := new(models.BlueprintSettings)
err := errors.Convert(json.Unmarshal(blueprint.Settings, bpSettings))
if err != nil {
return nil, errors.Default.Wrap(err, fmt.Sprintf("settings:%s", string(blueprint.Settings)))
}
bpSyncPolicy := plugin.BlueprintSyncPolicy{}
bpSyncPolicy.TimeAfter = bpSettings.TimeAfter
var plan plugin.PipelinePlan
switch bpSettings.Version {
case "1.0.0":
// Notice: v1 not complete SkipOnFail & TimeAfter
plan, err = GeneratePlanJsonV100(bpSettings)
case "2.0.0":
// load project metric plugins and convert it to a map
metrics := make(map[string]json.RawMessage)
projectMetrics := make([]models.ProjectMetricSetting, 0)
if blueprint.ProjectName != "" {
err = db.All(&projectMetrics, dal.Where("project_name = ? AND enable = ?", blueprint.ProjectName, true))
if err != nil {
return nil, err
}
for _, projectMetric := range projectMetrics {
metrics[projectMetric.PluginName] = json.RawMessage(projectMetric.PluginOption)
}
}
plan, err = GeneratePlanJsonV200(blueprint.ProjectName, bpSyncPolicy, bpSettings, metrics)
default:
return nil, errors.Default.New(fmt.Sprintf("unknown version of blueprint settings: %s", bpSettings.Version))
}
if err != nil {
return nil, err
}
return WrapPipelinePlans(bpSettings.BeforePlan, plan, bpSettings.AfterPlan)
}
// WrapPipelinePlans merges multiple pipelines and append before and after pipeline
func WrapPipelinePlans(beforePlanJson json.RawMessage, mainPlan plugin.PipelinePlan, afterPlanJson json.RawMessage) (plugin.PipelinePlan, errors.Error) {
beforePipelinePlan := plugin.PipelinePlan{}
afterPipelinePlan := plugin.PipelinePlan{}
if beforePlanJson != nil {
err := errors.Convert(json.Unmarshal(beforePlanJson, &beforePipelinePlan))
if err != nil {
return nil, err
}
}
if afterPlanJson != nil {
err := errors.Convert(json.Unmarshal(afterPlanJson, &afterPipelinePlan))
if err != nil {
return nil, err
}
}
return SequencializePipelinePlans(beforePipelinePlan, mainPlan, afterPipelinePlan), nil
}
// ParallelizePipelinePlans merges multiple pipelines into one unified plan
// by assuming they can be executed in parallel
func ParallelizePipelinePlans(plans ...plugin.PipelinePlan) plugin.PipelinePlan {
merged := make(plugin.PipelinePlan, 0)
// iterate all pipelineTasks and try to merge them into `merged`
for _, plan := range plans {
// add all stages from plan to merged
for index, stage := range plan {
if index >= len(merged) {
merged = append(merged, nil)
}
// add all tasks from plan to target respectively
merged[index] = append(merged[index], stage...)
}
}
return merged
}
// SequencializePipelinePlans merges multiple pipelines into one unified plan
// by assuming they must be executed in sequencial order
func SequencializePipelinePlans(plans ...plugin.PipelinePlan) plugin.PipelinePlan {
merged := make(plugin.PipelinePlan, 0)
// iterate all pipelineTasks and try to merge them into `merged`
for _, plan := range plans {
merged = append(merged, plan...)
}
return merged
}
// TriggerBlueprint triggers blueprint immediately
func TriggerBlueprint(id uint64) (*models.Pipeline, errors.Error) {
// load record from db
blueprint, err := GetBlueprint(id)
if err != nil {
return nil, err
}
pipeline, err := createPipelineByBlueprint(blueprint)
// done
return pipeline, err
}