blob: aed4440a4792db2552f47c57ca5fca91861996e6 [file] [log] [blame]
package main // must be main for plugin entry point
import (
"context"
"fmt"
"os"
"strconv"
"github.com/merico-dev/lake/config"
errors "github.com/merico-dev/lake/errors"
"github.com/merico-dev/lake/logger" // A pseudo type for Plugin Interface implementation
lakeModels "github.com/merico-dev/lake/models"
"github.com/merico-dev/lake/plugins/core"
"github.com/merico-dev/lake/plugins/gitlab/api"
"github.com/merico-dev/lake/plugins/gitlab/models"
"github.com/merico-dev/lake/plugins/gitlab/tasks"
"github.com/merico-dev/lake/utils"
"github.com/mitchellh/mapstructure"
)
var _ core.Plugin = (*Gitlab)(nil)
type GitlabOptions struct {
Tasks []string `json:"tasks,omitempty"`
}
type Gitlab string
func (plugin Gitlab) Description() string {
return "To collect and enrich data from Gitlab"
}
func (plugin Gitlab) Init() {
logger.Info("INFO >>> init go plugin", true)
err := lakeModels.Db.AutoMigrate(
&models.GitlabProject{},
&models.GitlabMergeRequest{},
&models.GitlabCommit{},
&models.GitlabTag{},
&models.GitlabProjectCommit{},
&models.GitlabPipeline{},
&models.GitlabReviewer{},
&models.GitlabMergeRequestNote{},
&models.GitlabMergeRequestCommit{},
&models.GitlabUser{},
)
if err != nil {
logger.Error("Error migrating gitlab: ", err)
panic(err)
}
}
func (plugin Gitlab) Execute(options map[string]interface{}, progress chan<- float32, ctx context.Context) error {
logger.Print("start gitlab plugin execution")
rateLimitPerSecondInt, err := core.GetRateLimitPerSecond(options, 10)
if err != nil {
return err
}
// GitLab's authenticated api rate limit is 2000 per min
// 30 tasks/min 60s/min = 1800 per min < 2000 per min
// You would think this would work but it hits the rate limit every time. I have to play with the number to see the right way to set it
scheduler, err := utils.NewWorkerScheduler(50, rateLimitPerSecondInt, ctx)
defer scheduler.Release()
if err != nil {
return fmt.Errorf("could not create scheduler")
}
projectId, ok := options["projectId"]
if !ok {
return fmt.Errorf("projectId is required for gitlab execution")
}
projectIdInt := int(projectId.(float64))
if projectIdInt < 0 {
return fmt.Errorf("projectId is invalid")
}
var op GitlabOptions
err = mapstructure.Decode(options, &op)
if err != nil {
return err
}
tasksToRun := make(map[string]bool, len(op.Tasks))
for _, task := range op.Tasks {
tasksToRun[task] = true
}
if len(tasksToRun) == 0 {
tasksToRun = map[string]bool{
"collectPipelines": true,
"collectCommits": true,
"CollectTags": true,
"collectMrs": true,
"collectMrNotes": true,
"collectMrCommits": true,
"enrichMrs": true,
"convertProjects": true,
"convertMrs": true,
"convertCommits": true,
"convertMrsCommits": true,
"convertNotes": true,
}
}
gitlabApiClient := tasks.CreateApiClient(scheduler)
err = gitlabApiClient.SetProxy(config.GetConfig().GetString("GITLAB_PROXY"))
if err != nil {
return err
}
progress <- 0.1
if err := tasks.CollectProject(ctx, projectIdInt, gitlabApiClient); err != nil {
return fmt.Errorf("could not collect projects: %v", err)
}
if tasksToRun["collectCommits"] {
progress <- 0.25
if err := tasks.CollectCommits(ctx, projectIdInt, gitlabApiClient); err != nil {
return &errors.SubTaskError{
SubTaskName: "collectCommits",
Message: fmt.Errorf("could not collect commits: %v", err).Error(),
}
}
}
if tasksToRun["collectTags"] {
progress <- 0.3
if err := tasks.CollectTags(ctx, projectIdInt, gitlabApiClient); err != nil {
return &errors.SubTaskError{
SubTaskName: "collectTags",
Message: fmt.Errorf("could not collect tags: %v", err).Error(),
}
}
}
if tasksToRun["collectMrs"] {
progress <- 0.35
mergeRequestErr := tasks.CollectMergeRequests(ctx, projectIdInt, gitlabApiClient)
if mergeRequestErr != nil {
return &errors.SubTaskError{
SubTaskName: "collectMrs",
Message: fmt.Errorf("could not collect merge requests: %v", mergeRequestErr).Error(),
}
}
}
if tasksToRun["collectMrNotes"] {
progress <- 0.4
err = tasks.CollectMergeRequestNotes(ctx, projectIdInt, rateLimitPerSecondInt, gitlabApiClient)
if err != nil {
return &errors.SubTaskError{
SubTaskName: "collectMrNotes",
Message: fmt.Errorf("could not collect merge request notes: %v", err).Error(),
}
}
}
if tasksToRun["collectMrCommits"] {
progress <- 0.45
err = tasks.CollectMergeRequestCommits(ctx, projectIdInt, rateLimitPerSecondInt, gitlabApiClient)
if err != nil {
return &errors.SubTaskError{
SubTaskName: "collectMrCommits",
Message: fmt.Errorf("could not collect merge request commits: %v", err).Error(),
}
}
}
if tasksToRun["enrichMrs"] {
progress <- 0.5
enrichErr := tasks.EnrichMergeRequests(ctx, projectIdInt)
if enrichErr != nil {
return &errors.SubTaskError{
SubTaskName: "enrichMrs",
Message: fmt.Errorf("could not enrich merge requests: %v", enrichErr).Error(),
}
}
}
if tasksToRun["collectPipelines"] {
progress <- 0.6
if err := tasks.CollectAllPipelines(projectIdInt, gitlabApiClient); err != nil {
return &errors.SubTaskError{
SubTaskName: "collectPipelines",
Message: fmt.Errorf("could not collect pipelines: %v", err).Error(),
}
}
if err := tasks.CollectChildrenOnPipelines(projectIdInt, gitlabApiClient); err != nil {
return &errors.SubTaskError{
SubTaskName: "collectChildrenOnPipelines",
Message: fmt.Errorf("could not collect children pipelines: %v", err).Error(),
}
}
}
if tasksToRun["convertProjects"] {
progress <- 0.7
err = tasks.ConvertProjects(ctx, projectIdInt)
if err != nil {
return &errors.SubTaskError{
SubTaskName: "convertProjects",
Message: err.Error(),
}
}
}
if tasksToRun["convertMrs"] {
progress <- 0.75
err = tasks.ConvertMrs(ctx, projectIdInt)
if err != nil {
return &errors.SubTaskError{
SubTaskName: "convertMrs",
Message: err.Error(),
}
}
}
if tasksToRun["convertCommits"] {
progress <- 0.8
err = tasks.ConvertCommits(projectIdInt)
if err != nil {
return &errors.SubTaskError{
SubTaskName: "convertCommits",
Message: err.Error(),
}
}
}
if tasksToRun["convertMrsCommits"] {
progress <- 0.85
err = tasks.ConvertMergeRequestCommits(projectIdInt)
if err != nil {
return &errors.SubTaskError{
SubTaskName: "convertMrsCommits",
Message: err.Error(),
}
}
}
if tasksToRun["convertNotes"] {
progress <- 0.9
err = tasks.ConvertNotes(ctx, projectIdInt)
if err != nil {
return &errors.SubTaskError{
SubTaskName: "convertNotes",
Message: err.Error(),
}
}
}
progress <- 1
return nil
}
func (plugin Gitlab) RootPkgPath() string {
return "github.com/merico-dev/lake/plugins/gitlab"
}
func (plugin Gitlab) ApiResources() map[string]map[string]core.ApiResourceHandler {
return map[string]map[string]core.ApiResourceHandler{
"test": {
"POST": api.TestConnection,
},
"sources": {
"GET": api.ListSources,
"POST": api.PutSource,
},
"sources/:sourceId": {
"GET": api.GetSource,
"PUT": api.PutSource,
},
}
}
// Export a variable named PluginEntry for Framework to search and load
var PluginEntry Gitlab //nolint
// standalone mode for debugging
func main() {
args := os.Args[1:]
if len(args) < 1 {
panic(fmt.Errorf("usage: go run ./plugins/gitlab <project_id>"))
}
projectId, err := strconv.ParseFloat(args[0], 64)
if err != nil {
panic(fmt.Errorf("error paring board_id: %w", err))
}
err = core.RegisterPlugin("gitlab", PluginEntry)
if err != nil {
panic(err)
}
PluginEntry.Init()
progress := make(chan float32)
go func() {
err2 := PluginEntry.Execute(
map[string]interface{}{
"projectId": projectId,
"tasks": []string{
//"collectMrCommits",
//"enrichMrs",
"convertProjects",
"convertMrs",
"convertCommits",
"convertNotes",
},
},
progress,
context.Background(),
)
if err2 != nil {
panic(err2)
}
close(progress)
}()
for p := range progress {
fmt.Println(p)
}
}