blob: 7f5bed64c9926514f50ba2e915b3bd93673bd201 [file] [log] [blame]
package tasks
import (
"encoding/json"
"net/http"
"reflect"
"github.com/merico-dev/lake/plugins/core"
"github.com/merico-dev/lake/plugins/helper"
"github.com/merico-dev/lake/plugins/jira/tasks/apiv2models"
)
const RAW_WORKLOGS_TABLE = "jira_api_worklogs"
func CollectWorklogs(taskCtx core.SubTaskContext) error {
db := taskCtx.GetDb()
data := taskCtx.GetData().(*JiraTaskData)
since := data.Since
logger := taskCtx.GetLogger()
connectionId := data.Connection.ID
boardId := data.Options.BoardId
tx := db.Table("_tool_jira_board_issues bi").
Select("bi.issue_id, NOW() AS update_time").
Joins("LEFT JOIN _tool_jira_issues i ON (bi.connection_id = i.connection_id AND bi.issue_id = i.issue_id)").
Where("bi.connection_id = ? AND bi.board_id = ? AND (i.worklog_updated IS NULL OR i.worklog_updated < i.updated)", connectionId, boardId)
if since != nil {
tx = tx.Where("i.updated > ?", since)
}
cursor, err := tx.Rows()
if err != nil {
return err
}
iterator, err := helper.NewCursorIterator(db, cursor, reflect.TypeOf(apiv2models.Input{}))
if err != nil {
return err
}
collector, err := helper.NewApiCollector(helper.ApiCollectorArgs{
RawDataSubTaskArgs: helper.RawDataSubTaskArgs{
Ctx: taskCtx,
Params: JiraApiParams{
ConnectionId: data.Connection.ID,
BoardId: data.Options.BoardId,
},
Table: RAW_WORKLOGS_TABLE,
},
Input: iterator,
ApiClient: data.ApiClient,
UrlTemplate: "api/2/issue/{{ .Input.IssueId }}/worklog",
PageSize: 50,
Incremental: true,
GetTotalPages: GetTotalPagesFromResponse,
ResponseParser: func(res *http.Response) ([]json.RawMessage, error) {
var data struct {
Worklogs []json.RawMessage `json:"worklogs"`
}
err := helper.UnmarshalResponse(res, &data)
if err != nil {
return nil, err
}
return data.Worklogs, nil
},
})
if err != nil {
logger.Error("collect board error:", err)
return err
}
return collector.Execute()
}