blob: 1c2544c7a0a5453f15c832369815cbb9bbf57ac3 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package services
import (
"encoding/json"
"fmt"
"strings"
"github.com/apache/incubator-devlake/errors"
"github.com/apache/incubator-devlake/logger"
"github.com/apache/incubator-devlake/models"
"github.com/apache/incubator-devlake/plugins/core"
"github.com/apache/incubator-devlake/plugins/helper"
"github.com/go-playground/validator/v10"
"github.com/robfig/cron/v3"
"gorm.io/gorm"
)
// BlueprintQuery FIXME ...
type BlueprintQuery struct {
Enable *bool `form:"enable,omitempty"`
Page int `form:"page"`
PageSize int `form:"pageSize"`
}
var (
blueprintLog = logger.Global.Nested("blueprint")
vld = validator.New()
)
// CreateBlueprint accepts a Blueprint instance and insert it to database
func CreateBlueprint(blueprint *models.Blueprint) error {
err := validateBlueprint(blueprint)
if err != nil {
return err
}
err = db.Create(&blueprint).Error
if err != nil {
return err
}
err = ReloadBlueprints(cronManager)
if err != nil {
return errors.InternalError
}
return nil
}
// GetBlueprints returns a paginated list of Blueprints based on `query`
func GetBlueprints(query *BlueprintQuery) ([]*models.Blueprint, int64, error) {
blueprints := make([]*models.Blueprint, 0)
db := db.Model(blueprints).Order("id DESC")
if query.Enable != nil {
db = db.Where("enable = ?", *query.Enable)
}
var count int64
err := db.Count(&count).Error
if err != nil {
return nil, 0, err
}
if query.Page > 0 && query.PageSize > 0 {
offset := query.PageSize * (query.Page - 1)
db = db.Limit(query.PageSize).Offset(offset)
}
err = db.Find(&blueprints).Error
if err != nil {
return nil, 0, err
}
return blueprints, count, nil
}
// GetBlueprint returns the detail of a given Blueprint ID
func GetBlueprint(blueprintId uint64) (*models.Blueprint, error) {
blueprint := &models.Blueprint{}
err := db.First(blueprint, blueprintId).Error
if err != nil {
if err == gorm.ErrRecordNotFound {
return nil, errors.NewNotFound("blueprint not found")
}
return nil, err
}
return blueprint, nil
}
func validateBlueprint(blueprint *models.Blueprint) error {
// validation
err := vld.Struct(blueprint)
if err != nil {
return err
}
if strings.ToLower(blueprint.CronConfig) == "manual" {
blueprint.IsManual = true
}
if !blueprint.IsManual {
_, err = cron.ParseStandard(blueprint.CronConfig)
if err != nil {
return fmt.Errorf("invalid cronConfig: %w", err)
}
}
if blueprint.Mode == models.BLUEPRINT_MODE_ADVANCED {
plan := make(core.PipelinePlan, 0)
err = json.Unmarshal(blueprint.Plan, &plan)
if err != nil {
return fmt.Errorf("invalid plan: %w", err)
}
// tasks should not be empty
if len(plan) == 0 || len(plan[0]) == 0 {
return fmt.Errorf("empty plan")
}
} else if blueprint.Mode == models.BLUEPRINT_MODE_NORMAL {
blueprint.Plan, err = GeneratePlanJson(blueprint.Settings)
if err != nil {
return fmt.Errorf("invalid plan: %w", err)
}
}
return nil
}
// PatchBlueprint FIXME ...
func PatchBlueprint(id uint64, body map[string]interface{}) (*models.Blueprint, error) {
// load record from db
blueprint, err := GetBlueprint(id)
if err != nil {
return nil, err
}
originMode := blueprint.Mode
err = helper.DecodeMapStruct(body, blueprint)
if err != nil {
return nil, err
}
// make sure mode is not being update
if originMode != blueprint.Mode {
return nil, fmt.Errorf("mode is not updatable")
}
// validation
err = validateBlueprint(blueprint)
if err != nil {
return nil, err
}
// save
err = db.Save(blueprint).Error
if err != nil {
return nil, errors.InternalError
}
// reload schedule
err = ReloadBlueprints(cronManager)
if err != nil {
return nil, errors.InternalError
}
// done
return blueprint, nil
}
// DeleteBlueprint FIXME ...
func DeleteBlueprint(id uint64) error {
err := db.Delete(&models.Blueprint{}, "id = ?", id).Error
if err != nil {
return errors.InternalError
}
err = ReloadBlueprints(cronManager)
if err != nil {
return errors.InternalError
}
return nil
}
// ReloadBlueprints FIXME ...
func ReloadBlueprints(c *cron.Cron) error {
blueprints := make([]*models.Blueprint, 0)
err := db.Model(&models.Blueprint{}).
Where("enable = ? AND is_manual = ?", true, false).
Find(&blueprints).Error
if err != nil {
panic(err)
}
for _, e := range c.Entries() {
c.Remove(e.ID)
}
c.Stop()
for _, pp := range blueprints {
blueprint := pp
plan, err := pp.UnmarshalPlan()
if err != nil {
blueprintLog.Error("created cron job failed: %s", err)
return err
}
_, err = c.AddFunc(pp.CronConfig, func() {
pipeline, err := createPipelineByBlueprint(blueprint.ID, blueprint.Name, plan)
if err != nil {
blueprintLog.Error("run cron job failed: %s", err)
} else {
blueprintLog.Info("Run new cron job successfully, pipeline id: %d", pipeline.ID)
}
})
if err != nil {
blueprintLog.Error("created cron job failed: %s", err)
return err
}
}
if len(blueprints) > 0 {
c.Start()
}
log.Info("total %d blueprints were scheduled", len(blueprints))
return nil
}
func createPipelineByBlueprint(blueprintId uint64, name string, plan core.PipelinePlan) (*models.Pipeline, error) {
newPipeline := models.NewPipeline{}
newPipeline.Plan = plan
newPipeline.Name = name
newPipeline.BlueprintId = blueprintId
pipeline, err := CreatePipeline(&newPipeline)
// Return all created tasks to the User
if err != nil {
blueprintLog.Error("created cron job failed: %s", err)
return nil, err
}
return pipeline, err
}
// GeneratePlanJson generates pipeline plan by version
func GeneratePlanJson(settings json.RawMessage) (json.RawMessage, error) {
bpSettings := new(models.BlueprintSettings)
err := json.Unmarshal(settings, bpSettings)
if err != nil {
fmt.Println(string(settings))
return nil, err
}
var plan interface{}
switch bpSettings.Version {
case "1.0.0":
plan, err = GeneratePlanJsonV100(bpSettings)
default:
return nil, fmt.Errorf("unknown version of blueprint settings: %s", bpSettings.Version)
}
if err != nil {
return nil, err
}
return json.Marshal(plan)
}
// GeneratePlanJsonV100 generates pipeline plan according v1.0.0 definition
func GeneratePlanJsonV100(settings *models.BlueprintSettings) (core.PipelinePlan, error) {
connections := make([]*core.BlueprintConnectionV100, 0)
err := json.Unmarshal(settings.Connections, &connections)
if err != nil {
return nil, err
}
plans := make([]core.PipelinePlan, len(connections))
for i, connection := range connections {
if len(connection.Scope) == 0 {
return nil, fmt.Errorf("connections[%d].scope is empty", i)
}
plugin, err := core.GetPlugin(connection.Plugin)
if err != nil {
return nil, err
}
if pluginBp, ok := plugin.(core.PluginBlueprintV100); ok {
plans[i], err = pluginBp.MakePipelinePlan(connection.ConnectionId, connection.Scope)
if err != nil {
return nil, err
}
} else {
return nil, fmt.Errorf("plugin %s does not support blueprint protocol version 1.0.0", connection.Plugin)
}
}
mergedPipelinePlan := MergePipelinePlans(plans...)
return FormatPipelinePlans(settings.BeforePlan, mergedPipelinePlan, settings.AfterPlan)
}
// FormatPipelinePlans merges multiple pipelines and append before and after pipeline
func FormatPipelinePlans(beforePlanJson json.RawMessage, mainPlan core.PipelinePlan, afterPlanJson json.RawMessage) (core.PipelinePlan, error) {
newPipelinePlan := core.PipelinePlan{}
if beforePlanJson != nil {
beforePipelinePlan := core.PipelinePlan{}
err := json.Unmarshal(beforePlanJson, &beforePipelinePlan)
if err != nil {
return nil, err
}
newPipelinePlan = append(newPipelinePlan, beforePipelinePlan...)
}
newPipelinePlan = append(newPipelinePlan, mainPlan...)
if afterPlanJson != nil {
afterPipelinePlan := core.PipelinePlan{}
err := json.Unmarshal(afterPlanJson, &afterPipelinePlan)
if err != nil {
return nil, err
}
newPipelinePlan = append(newPipelinePlan, afterPipelinePlan...)
}
return newPipelinePlan, nil
}
// MergePipelinePlans merges multiple pipelines into one unified pipeline
func MergePipelinePlans(plans ...core.PipelinePlan) core.PipelinePlan {
merged := make(core.PipelinePlan, 0)
// iterate all pipelineTasks and try to merge them into `merged`
for _, plan := range plans {
// add all stages from plan to merged
for index, stage := range plan {
if index >= len(merged) {
merged = append(merged, nil)
}
// add all tasks from plan to target respectively
merged[index] = append(merged[index], stage...)
}
}
return merged
}
// TriggerBlueprint triggers blueprint immediately
func TriggerBlueprint(id uint64) (*models.Pipeline, error) {
// load record from db
blueprint, err := GetBlueprint(id)
if err != nil {
return nil, err
}
plan, err := blueprint.UnmarshalPlan()
if err != nil {
return nil, err
}
pipeline, err := createPipelineByBlueprint(blueprint.ID, blueprint.Name, plan)
// done
return pipeline, err
}