blob: 3c100d7dd2e8cb883feb9c46e49de7b29f28776e [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package tasks
import (
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"reflect"
"strconv"
"time"
"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/models/common"
"github.com/apache/incubator-devlake/plugins/gitlab/models"
"github.com/apache/incubator-devlake/core/plugin"
"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
)
const (
// https://docs.gitlab.com/ee/api/pipelines.html#list-project-pipelines
// https://docs.gitlab.com/ee/api/deployments.html#list-project-deployments
// https://docs.gitlab.com/ee/api/jobs.html#list-project-jobs
StatusSuccess = "success"
StatusFailed = "failed"
StatusCanceled = "canceled"
StatusRunning = "running"
StatusWaitingForResource = "waiting_for_resource"
StatusPending = "pending"
StatusPreparing = "preparing"
StatusCreated = "created"
StatusManual = "manual"
StatusScheduled = "scheduled"
StatusSkipped = "skipped"
StatusBlocked = "blocked"
// the following two status are handle in codes, but cannot be seen in documents.
StatusCompleted = "COMPLETED"
StatusUndeployed = "UNDEPLOYED"
)
type GitlabInput struct {
GitlabId int
Iid int
}
func GetTotalPagesFromResponse(res *http.Response, args *api.ApiCollectorArgs) (int, errors.Error) {
total := res.Header.Get("X-Total-Pages")
if total == "" {
return 0, api.ErrUndetermined
}
totalInt, err := strconv.Atoi(total)
if err != nil {
return 0, errors.Convert(err)
}
return totalInt, nil
}
func GetRawMessageFromResponse(res *http.Response) ([]json.RawMessage, errors.Error) {
rawMessages := []json.RawMessage{}
if res == nil {
return nil, errors.Default.New("res is nil")
}
defer res.Body.Close()
resBody, err := io.ReadAll(res.Body)
if err != nil {
return nil, errors.Default.Wrap(err, fmt.Sprintf("error reading response from %s", res.Request.URL.String()))
}
err = errors.Convert(json.Unmarshal(resBody, &rawMessages))
if err != nil {
return nil, errors.Default.Wrap(err, fmt.Sprintf("error decoding response from %s. raw response was: %s", res.Request.URL.String(), string(resBody)))
}
return rawMessages, nil
}
func GetOneRawMessageFromResponse(res *http.Response) ([]json.RawMessage, errors.Error) {
rawMessage := json.RawMessage{}
if res == nil {
return nil, errors.Default.New("res is nil")
}
defer res.Body.Close()
resBody, err := io.ReadAll(res.Body)
if err != nil {
return nil, errors.Default.Wrap(err, fmt.Sprintf("error reading response from %s", res.Request.URL.String()))
}
err = errors.Convert(json.Unmarshal(resBody, &rawMessage))
if err != nil {
return nil, errors.Default.Wrap(err, fmt.Sprintf("error decoding response from %s. raw response was: %s", res.Request.URL.String(), string(resBody)))
}
return []json.RawMessage{rawMessage}, nil
}
func GetRawMessageUpdatedAtAfter(timeAfter *time.Time) func(res *http.Response) ([]json.RawMessage, errors.Error) {
type ApiModel struct {
UpdatedAt *common.Iso8601Time `json:"updated_at"`
}
return func(res *http.Response) ([]json.RawMessage, errors.Error) {
rawMessages, err := GetRawMessageFromResponse(res)
if err != nil {
return nil, errors.Default.Wrap(err, fmt.Sprintf("error reading response from %s", res.Request.URL.String()))
}
isFinish := true
filterRawMessages := []json.RawMessage{}
for _, rawMessage := range rawMessages {
apiModel := &ApiModel{}
err = errors.Convert(json.Unmarshal(rawMessage, apiModel))
if err != nil {
return nil, err
}
if timeAfter == nil || timeAfter.Before(apiModel.UpdatedAt.ToTime()) {
// only finish when all items are created before `createDateAfter`
// because gitlab's order may not strict enough
isFinish = false
filterRawMessages = append(filterRawMessages, rawMessage)
}
}
if isFinish {
return filterRawMessages, api.ErrFinishCollect
}
return filterRawMessages, nil
}
}
func GetQuery(reqData *api.RequestData) (url.Values, errors.Error) {
query := url.Values{}
query.Set("with_stats", "true")
query.Set("sort", "asc")
query.Set("page", strconv.Itoa(reqData.Pager.Page))
query.Set("per_page", strconv.Itoa(reqData.Pager.Size))
return query, nil
}
func CreateRawDataSubTaskArgs(subtaskCtx plugin.SubTaskContext, Table string) (*api.RawDataSubTaskArgs, *GitlabTaskData) {
data := subtaskCtx.GetData().(*GitlabTaskData)
rawDataSubTaskArgs := &api.RawDataSubTaskArgs{
Ctx: subtaskCtx,
Params: models.GitlabApiParams{
ProjectId: data.Options.ProjectId,
ConnectionId: data.Options.ConnectionId,
},
Table: Table,
}
return rawDataSubTaskArgs, data
}
func CreateSubtaskCommonArgs(subtaskCtx plugin.SubTaskContext, table string) (*api.SubtaskCommonArgs, *GitlabTaskData) {
data := subtaskCtx.GetData().(*GitlabTaskData)
args := &api.SubtaskCommonArgs{
SubTaskContext: subtaskCtx,
Table: table,
Params: models.GitlabApiParams{
ConnectionId: data.Options.ConnectionId,
ProjectId: data.Options.ProjectId,
},
}
return args, data
}
func GetMergeRequestsIterator(taskCtx plugin.SubTaskContext, apiCollector *api.StatefulApiCollector) (*api.DalCursorIterator, errors.Error) {
db := taskCtx.GetDal()
data := taskCtx.GetData().(*GitlabTaskData)
clauses := []dal.Clause{
dal.Select("gmr.gitlab_id, gmr.iid"),
dal.From("_tool_gitlab_merge_requests gmr"),
// collect only openning merge request's notes and commits to speed up the process
dal.Where(
`gmr.project_id = ? and gmr.connection_id = ?`,
data.Options.ProjectId, data.Options.ConnectionId,
),
}
if apiCollector != nil {
if apiCollector.GetSince() != nil {
clauses = append(clauses, dal.Where("gitlab_updated_at > ?", *apiCollector.GetSince()))
}
}
// construct the input iterator
cursor, err := db.Cursor(clauses...)
if err != nil {
return nil, err
}
return api.NewDalCursorIterator(db, cursor, reflect.TypeOf(GitlabInput{}))
}