feat: add collector_meta and CreatedDateAfter for jira (#3813)
* feat: add collector_meta and startFrom for jira
* fix: fix for linter
* feat: record state in framework
* fix: add some comment
* fix: only save state when collect success
* fix: change column name for review
diff --git a/models/collector_state.go b/models/collector_state.go
new file mode 100644
index 0000000..e0230a4
--- /dev/null
+++ b/models/collector_state.go
@@ -0,0 +1,35 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package models
+
+import (
+ "time"
+)
+
+type CollectorLatestState struct {
+ CreatedAt time.Time `json:"createdAt"`
+ UpdatedAt time.Time `json:"updatedAt"`
+ RawDataParams string `gorm:"primaryKey;column:raw_data_params;type:varchar(255);index" json:"raw_data_params"`
+ RawDataTable string `gorm:"primaryKey;column:raw_data_table;type:varchar(255)" json:"raw_data_table"`
+ CreatedDateAfter *time.Time
+ LatestSuccessStart *time.Time
+}
+
+func (CollectorLatestState) TableName() string {
+ return "_devlake_collector_latest_state"
+}
diff --git a/models/migrationscripts/20221125_add_collector_state.go b/models/migrationscripts/20221125_add_collector_state.go
new file mode 100644
index 0000000..8aed5fc
--- /dev/null
+++ b/models/migrationscripts/20221125_add_collector_state.go
@@ -0,0 +1,39 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package migrationscripts
+
+import (
+ "github.com/apache/incubator-devlake/errors"
+ "github.com/apache/incubator-devlake/helpers/migrationhelper"
+ "github.com/apache/incubator-devlake/models/migrationscripts/archived"
+ "github.com/apache/incubator-devlake/plugins/core"
+)
+
+type addCollectorMeta20221125 struct{}
+
+func (script *addCollectorMeta20221125) Up(basicRes core.BasicRes) errors.Error {
+ return migrationhelper.AutoMigrateTables(basicRes, &archived.CollectorLatestState{})
+}
+
+func (*addCollectorMeta20221125) Version() uint64 {
+ return 20221125000038
+}
+
+func (*addCollectorMeta20221125) Name() string {
+ return "save state for collector"
+}
diff --git a/models/migrationscripts/archived/collector_state.go b/models/migrationscripts/archived/collector_state.go
new file mode 100644
index 0000000..e710803
--- /dev/null
+++ b/models/migrationscripts/archived/collector_state.go
@@ -0,0 +1,35 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package archived
+
+import (
+ "time"
+)
+
+type CollectorLatestState struct {
+ CreatedAt time.Time `json:"createdAt"`
+ UpdatedAt time.Time `json:"updatedAt"`
+ RawDataParams string `gorm:"primaryKey;column:raw_data_params;type:varchar(255);index" json:"raw_data_params"`
+ RawDataTable string `gorm:"primaryKey;column:raw_data_table;type:varchar(255)" json:"raw_data_table"`
+ CreatedDateAfter *time.Time
+ LatestSuccessStart *time.Time
+}
+
+func (CollectorLatestState) TableName() string {
+ return "_devlake_collector_latest_state"
+}
diff --git a/models/migrationscripts/register.go b/models/migrationscripts/register.go
index f9d1029..ad39df3 100644
--- a/models/migrationscripts/register.go
+++ b/models/migrationscripts/register.go
@@ -63,5 +63,6 @@
new(addLabels),
new(renameFiledsInProjectPrMetric),
new(addEnableToProjectMetric),
+ new(addCollectorMeta20221125),
}
}
diff --git a/plugins/core/plugin_task.go b/plugins/core/plugin_task.go
index ea09e98..5716f5b 100644
--- a/plugins/core/plugin_task.go
+++ b/plugins/core/plugin_task.go
@@ -104,7 +104,7 @@
PrepareTaskData(taskCtx TaskContext, options map[string]interface{}) (interface{}, errors.Error)
}
-// CloseablePluginTask Extends PluginTask, and invokes a Close method after all subtasks are done
+// CloseablePluginTask Extends PluginTask, and invokes a Close method after all subtasks are done or fail
type CloseablePluginTask interface {
PluginTask
Close(taskCtx TaskContext) errors.Error
diff --git a/plugins/helper/api_collector_with_state.go b/plugins/helper/api_collector_with_state.go
new file mode 100644
index 0000000..6404b75
--- /dev/null
+++ b/plugins/helper/api_collector_with_state.go
@@ -0,0 +1,94 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package helper
+
+import (
+ "github.com/apache/incubator-devlake/errors"
+ "github.com/apache/incubator-devlake/models"
+ "github.com/apache/incubator-devlake/plugins/core/dal"
+ "gorm.io/gorm"
+ "time"
+)
+
+// ApiCollectorStateManager save collector state in framework table
+type ApiCollectorStateManager struct {
+ RawDataSubTaskArgs
+ *ApiCollector
+ LatestState models.CollectorLatestState
+ CreatedDateAfter *time.Time
+ ExecuteStart time.Time
+}
+
+// NewApiCollectorWithState create a new ApiCollectorStateManager
+func NewApiCollectorWithState(args RawDataSubTaskArgs, createdDateAfter *time.Time) (*ApiCollectorStateManager, errors.Error) {
+ db := args.Ctx.GetDal()
+
+ rawDataSubTask, err := NewRawDataSubTask(args)
+ if err != nil {
+ return nil, errors.Default.Wrap(err, "Couldn't resolve raw subtask args")
+ }
+ latestState := models.CollectorLatestState{}
+ err = db.First(&latestState, dal.Where(`raw_data_table = ? AND raw_data_params = ?`, rawDataSubTask.table, rawDataSubTask.params))
+ if err != nil {
+ if errors.Is(err, gorm.ErrRecordNotFound) {
+ latestState = models.CollectorLatestState{
+ RawDataTable: rawDataSubTask.table,
+ RawDataParams: rawDataSubTask.params,
+ }
+ } else {
+ return nil, errors.Default.Wrap(err, "failed to load JiraLatestCollectorMeta")
+ }
+ }
+ return &ApiCollectorStateManager{
+ RawDataSubTaskArgs: args,
+ LatestState: latestState,
+ CreatedDateAfter: createdDateAfter,
+ ExecuteStart: time.Now(),
+ }, nil
+}
+
+// CanIncrementCollect return if the old data can support collect incrementally.
+// only when latest collection is success &&
+// (m.LatestState.CreatedDateAfter == nil means all data have been collected ||
+// CreatedDateAfter at this time exists and later than in the LatestState)
+// if CreatedDateAfter at this time not exists, collect incrementally only when "m.LatestState.CreatedDateAfter == nil"
+func (m ApiCollectorStateManager) CanIncrementCollect() bool {
+ return m.LatestState.LatestSuccessStart != nil &&
+ (m.LatestState.CreatedDateAfter == nil || m.CreatedDateAfter != nil && m.CreatedDateAfter.After(*m.LatestState.CreatedDateAfter))
+}
+
+// InitCollector init the embedded collector
+func (m *ApiCollectorStateManager) InitCollector(args ApiCollectorArgs) (err errors.Error) {
+ args.RawDataSubTaskArgs = m.RawDataSubTaskArgs
+ m.ApiCollector, err = NewApiCollector(args)
+ return err
+}
+
+// Execute the embedded collector and record execute state
+func (m ApiCollectorStateManager) Execute() errors.Error {
+ err := m.ApiCollector.Execute()
+ if err != nil {
+ return err
+ }
+
+ db := m.Ctx.GetDal()
+ m.LatestState.LatestSuccessStart = &m.ExecuteStart
+ m.LatestState.CreatedDateAfter = m.CreatedDateAfter
+ err = db.CreateOrUpdate(&m.LatestState)
+ return err
+}
diff --git a/plugins/jira/impl/impl.go b/plugins/jira/impl/impl.go
index 29aaf36..2aff632 100644
--- a/plugins/jira/impl/impl.go
+++ b/plugins/jira/impl/impl.go
@@ -171,9 +171,9 @@
return nil, errors.Default.Wrap(err, "unable to get Jira connection")
}
- var since time.Time
- if op.Since != "" {
- since, err = time.Parse("2006-01-02T15:04:05Z", op.Since)
+ var createdDateAfter time.Time
+ if op.CreatedDateAfter != "" {
+ createdDateAfter, err = time.Parse("2006-01-02T15:04:05Z", op.CreatedDateAfter)
if err != nil {
return nil, errors.BadInput.Wrap(err, "invalid value for `since`")
}
@@ -208,10 +208,11 @@
ApiClient: jiraApiClient,
JiraServerInfo: *info,
}
- if !since.IsZero() {
- taskData.Since = &since
- logger.Debug("collect data updated since %s", since)
+ if !createdDateAfter.IsZero() {
+ taskData.CreatedDateAfter = &createdDateAfter
+ logger.Debug("collect data created from %s", createdDateAfter)
}
+
return taskData, nil
}
diff --git a/plugins/jira/jira.go b/plugins/jira/jira.go
index 0cf85de..dcb50c3 100644
--- a/plugins/jira/jira.go
+++ b/plugins/jira/jira.go
@@ -32,12 +32,12 @@
boardId := cmd.Flags().Uint64P("board", "b", 0, "jira board id")
_ = cmd.MarkFlagRequired("connection")
_ = cmd.MarkFlagRequired("board")
- since := cmd.Flags().StringP("since", "s", "", "collect data that are updated after specified time, ie 2006-05-06T07:08:09Z")
+ CreatedDateAfter := cmd.Flags().StringP("created_date_after", "a", "", "collect data that are updated after specified time, ie 2006-05-06T07:08:09Z")
cmd.Run = func(c *cobra.Command, args []string) {
runner.DirectRun(c, args, PluginEntry, map[string]interface{}{
- "connectionId": *connectionId,
- "boardId": *boardId,
- "since": *since,
+ "connectionId": *connectionId,
+ "boardId": *boardId,
+ "CreatedDateAfter": *CreatedDateAfter,
})
}
runner.RunCmd(cmd)
diff --git a/plugins/jira/tasks/epic_collector.go b/plugins/jira/tasks/epic_collector.go
index 78e6c31..ef4c270 100644
--- a/plugins/jira/tasks/epic_collector.go
+++ b/plugins/jira/tasks/epic_collector.go
@@ -53,12 +53,7 @@
if err != nil {
return err
}
- since := data.Since
jql := "ORDER BY created ASC"
- if since != nil {
- // prepend a time range criteria if `since` was specified, either by user or from database
- jql = fmt.Sprintf("updated >= '%s' %s", since.Format("2006/01/02 15:04"), jql)
- }
collector, err := helper.NewApiCollector(helper.ApiCollectorArgs{
RawDataSubTaskArgs: helper.RawDataSubTaskArgs{
Ctx: taskCtx,
diff --git a/plugins/jira/tasks/issue_changelog_collector.go b/plugins/jira/tasks/issue_changelog_collector.go
index dc3e6f1..30c47cd 100644
--- a/plugins/jira/tasks/issue_changelog_collector.go
+++ b/plugins/jira/tasks/issue_changelog_collector.go
@@ -52,6 +52,18 @@
log := taskCtx.GetLogger()
db := taskCtx.GetDal()
+ collectorWithState, err := helper.NewApiCollectorWithState(helper.RawDataSubTaskArgs{
+ Ctx: taskCtx,
+ Params: JiraApiParams{
+ ConnectionId: data.Options.ConnectionId,
+ BoardId: data.Options.BoardId,
+ },
+ Table: RAW_CHANGELOG_TABLE,
+ }, data.CreatedDateAfter)
+ if err != nil {
+ return err
+ }
+
// query for issue_ids that needed changelog collection
clauses := []dal.Clause{
dal.Select("i.issue_id, i.updated AS update_time"),
@@ -60,12 +72,10 @@
dal.Join("LEFT JOIN _tool_jira_issue_changelogs c ON (c.connection_id = i.connection_id AND c.issue_id = i.issue_id)"),
dal.Where("i.updated > i.created AND bi.connection_id = ? AND bi.board_id = ? AND i.std_type != ? ", data.Options.ConnectionId, data.Options.BoardId, "Epic"),
dal.Groupby("i.issue_id, i.updated"),
- dal.Having("i.updated > max(c.issue_updated) OR (max(c.issue_updated) IS NULL AND COUNT(c.changelog_id) > 0)"),
}
- // apply time range if any
- since := data.Since
- if since != nil {
- clauses = append(clauses, dal.Where("i.updated > ?", *since))
+ incremental := collectorWithState.CanIncrementCollect()
+ if incremental {
+ clauses = append(clauses, dal.Having("i.updated > max(c.issue_updated) OR (max(c.issue_updated) IS NULL AND COUNT(c.changelog_id) > 0)"))
}
if log.IsLevelEnabled(core.LOG_DEBUG) {
@@ -88,18 +98,10 @@
}
// now, let ApiCollector takes care the rest
- collector, err := helper.NewApiCollector(helper.ApiCollectorArgs{
- RawDataSubTaskArgs: helper.RawDataSubTaskArgs{
- Ctx: taskCtx,
- Params: JiraApiParams{
- ConnectionId: data.Options.ConnectionId,
- BoardId: data.Options.BoardId,
- },
- Table: RAW_CHANGELOG_TABLE,
- },
+ err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: 100,
- Incremental: since == nil,
+ Incremental: incremental,
GetTotalPages: GetTotalPagesFromResponse,
Input: iterator,
UrlTemplate: "api/3/issue/{{ .Input.IssueId }}/changelog",
@@ -127,5 +129,5 @@
return err
}
- return collector.Execute()
+ return collectorWithState.Execute()
}
diff --git a/plugins/jira/tasks/issue_collector.go b/plugins/jira/tasks/issue_collector.go
index 50db842..2120e6b 100644
--- a/plugins/jira/tasks/issue_collector.go
+++ b/plugins/jira/tasks/issue_collector.go
@@ -21,26 +21,20 @@
"encoding/json"
goerror "errors"
"fmt"
- "github.com/apache/incubator-devlake/errors"
+ "gorm.io/gorm"
"io"
"net/http"
"net/url"
+ "github.com/apache/incubator-devlake/errors"
"github.com/apache/incubator-devlake/plugins/core"
"github.com/apache/incubator-devlake/plugins/core/dal"
"github.com/apache/incubator-devlake/plugins/helper"
"github.com/apache/incubator-devlake/plugins/jira/models"
- "gorm.io/gorm"
)
const RAW_ISSUE_TABLE = "jira_api_issues"
-// this struct should be moved to `jira_api_common.go`
-type JiraApiParams struct {
- ConnectionId uint64
- BoardId uint64
-}
-
var _ core.SubTaskEntryPoint = CollectIssues
var CollectIssuesMeta = core.SubTaskMeta{
@@ -55,10 +49,38 @@
db := taskCtx.GetDal()
data := taskCtx.GetData().(*JiraTaskData)
- since := data.Since
- incremental := false
- // user didn't specify a time range to sync, try load from database
- if since == nil {
+ collectorWithState, err := helper.NewApiCollectorWithState(helper.RawDataSubTaskArgs{
+ Ctx: taskCtx,
+ /*
+ This struct will be JSONEncoded and stored into database along with raw data itself, to identity minimal
+ set of data to be process, for example, we process JiraIssues by Board
+ */
+ Params: JiraApiParams{
+ ConnectionId: data.Options.ConnectionId,
+ BoardId: data.Options.BoardId,
+ },
+ /*
+ Table store raw data
+ */
+ Table: RAW_ISSUE_TABLE,
+ }, data.CreatedDateAfter)
+ if err != nil {
+ return err
+ }
+
+ // build jql
+ // IMPORTANT: we have to keep paginated data in a consistence order to avoid data-missing, if we sort issues by
+ // `updated`, issue will be jumping between pages if it got updated during the collection process
+ createdDateAfter := data.CreatedDateAfter
+ jql := "ORDER BY created ASC"
+ if createdDateAfter != nil {
+ // prepend a time range criteria if `since` was specified, either by user or from database
+ jql = fmt.Sprintf("created >= '%v' %v", createdDateAfter.Format("2006/01/02 15:04"), jql)
+ }
+
+ incremental := collectorWithState.CanIncrementCollect()
+ if incremental {
+ // user didn't specify a time range to sync, try load from database
var latestUpdated models.JiraIssue
clauses := []dal.Clause{
dal.Select("_tool_jira_issues.*"),
@@ -72,35 +94,13 @@
return errors.NotFound.Wrap(err, "failed to get latest jira issue record")
}
if latestUpdated.IssueId > 0 {
- since = &latestUpdated.Updated
- incremental = true
+ jql = fmt.Sprintf("updated >= '%v' %v", latestUpdated.Updated.Format("2006/01/02 15:04"), jql)
+ } else {
+ incremental = false
}
}
- // build jql
- // IMPORTANT: we have to keep paginated data in a consistence order to avoid data-missing, if we sort issues by
- // `updated`, issue will be jumping between pages if it got updated during the collection process
- jql := "ORDER BY created ASC"
- if since != nil {
- // prepend a time range criteria if `since` was specified, either by user or from database
- jql = fmt.Sprintf("updated >= '%v' %v", since.Format("2006/01/02 15:04"), jql)
- }
- collector, err := helper.NewApiCollector(helper.ApiCollectorArgs{
- RawDataSubTaskArgs: helper.RawDataSubTaskArgs{
- Ctx: taskCtx,
- /*
- This struct will be JSONEncoded and stored into database along with raw data itself, to identity minimal
- set of data to be process, for example, we process JiraIssues by Board
- */
- Params: JiraApiParams{
- ConnectionId: data.Options.ConnectionId,
- BoardId: data.Options.BoardId,
- },
- /*
- Table store raw data
- */
- Table: RAW_ISSUE_TABLE,
- },
+ err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: 100,
Incremental: incremental,
@@ -159,10 +159,9 @@
return data.Issues, nil
},
})
-
if err != nil {
return err
}
- return collector.Execute()
+ return collectorWithState.Execute()
}
diff --git a/plugins/jira/tasks/issue_extractor.go b/plugins/jira/tasks/issue_extractor.go
index bf6f623..7f61c52 100644
--- a/plugins/jira/tasks/issue_extractor.go
+++ b/plugins/jira/tasks/issue_extractor.go
@@ -111,7 +111,7 @@
if issue.ResolutionDate != nil {
issue.LeadTimeMinutes = uint(issue.ResolutionDate.Unix()-issue.Created.Unix()) / 60
}
- if data.Options.TransformationRules.StoryPointField != "" {
+ if data.Options.TransformationRules != nil && data.Options.TransformationRules.StoryPointField != "" {
unknownStoryPoint := apiIssue.Fields.AllFields[data.Options.TransformationRules.StoryPointField]
switch sp := unknownStoryPoint.(type) {
case string:
@@ -193,9 +193,11 @@
}
stdTypeMappings := make(map[string]string)
standardStatusMappings := make(map[string]StatusMappings)
- for userType, stdType := range data.Options.TransformationRules.TypeMappings {
- stdTypeMappings[userType] = strings.ToUpper(stdType.StandardType)
- standardStatusMappings[userType] = stdType.StatusMappings
+ if data.Options.TransformationRules != nil {
+ for userType, stdType := range data.Options.TransformationRules.TypeMappings {
+ stdTypeMappings[userType] = strings.ToUpper(stdType.StandardType)
+ standardStatusMappings[userType] = stdType.StatusMappings
+ }
}
return &typeMappings{
typeIdMappings: typeIdMapping,
diff --git a/plugins/jira/tasks/remotelink_collector.go b/plugins/jira/tasks/remotelink_collector.go
index b641f9d..4867225 100644
--- a/plugins/jira/tasks/remotelink_collector.go
+++ b/plugins/jira/tasks/remotelink_collector.go
@@ -47,6 +47,18 @@
logger := taskCtx.GetLogger()
logger.Info("collect remotelink")
+ collectorWithState, err := helper.NewApiCollectorWithState(helper.RawDataSubTaskArgs{
+ Ctx: taskCtx,
+ Params: JiraApiParams{
+ ConnectionId: data.Options.ConnectionId,
+ BoardId: data.Options.BoardId,
+ },
+ Table: RAW_REMOTELINK_TABLE,
+ }, data.CreatedDateAfter)
+ if err != nil {
+ return err
+ }
+
clauses := []dal.Clause{
dal.Select("i.issue_id, i.updated AS update_time"),
dal.From("_tool_jira_board_issues bi"),
@@ -54,12 +66,14 @@
dal.Join("LEFT JOIN _tool_jira_remotelinks rl ON (rl.connection_id = i.connection_id AND rl.issue_id = i.issue_id)"),
dal.Where("i.updated > i.created AND bi.connection_id = ? AND bi.board_id = ? ", data.Options.ConnectionId, data.Options.BoardId),
dal.Groupby("i.issue_id, i.updated"),
- dal.Having("i.updated > max(rl.issue_updated) OR max(rl.issue_updated) IS NULL"),
}
- // apply time range if any
- since := data.Since
- if since != nil {
- clauses = append(clauses, dal.Where("i.updated > ?", *since))
+ incremental := collectorWithState.CanIncrementCollect()
+ if incremental {
+ if collectorWithState.LatestState.LatestSuccessStart != nil {
+ clauses = append(clauses, dal.Having("i.updated > ? AND (i.updated > max(rl.issue_updated) OR max(rl.issue_updated) IS NULL)", collectorWithState.LatestState.LatestSuccessStart))
+ } else {
+ clauses = append(clauses, dal.Having("i.updated > max(rl.issue_updated) OR max(rl.issue_updated) IS NULL"))
+ }
}
cursor, err := db.Cursor(clauses...)
if err != nil {
@@ -73,18 +87,10 @@
return err
}
- collector, err := helper.NewApiCollector(helper.ApiCollectorArgs{
- RawDataSubTaskArgs: helper.RawDataSubTaskArgs{
- Ctx: taskCtx,
- Params: JiraApiParams{
- ConnectionId: data.Options.ConnectionId,
- BoardId: data.Options.BoardId,
- },
- Table: RAW_REMOTELINK_TABLE,
- },
+ err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
ApiClient: data.ApiClient,
Input: iterator,
- Incremental: since == nil,
+ Incremental: incremental,
UrlTemplate: "api/2/issue/{{ .Input.IssueId }}/remotelink",
ResponseParser: func(res *http.Response) ([]json.RawMessage, errors.Error) {
if res.StatusCode == http.StatusNotFound {
@@ -102,7 +108,7 @@
if err != nil {
return err
}
- err = collector.Execute()
+ err = collectorWithState.Execute()
if err != nil {
return err
}
diff --git a/plugins/jira/tasks/remotelink_extractor.go b/plugins/jira/tasks/remotelink_extractor.go
index b857e26..0418a1a 100644
--- a/plugins/jira/tasks/remotelink_extractor.go
+++ b/plugins/jira/tasks/remotelink_extractor.go
@@ -45,7 +45,8 @@
logger.Info("extract remote links")
var commitShaRegex *regexp.Regexp
var err error
- if pattern := data.Options.TransformationRules.RemotelinkCommitShaPattern; pattern != "" {
+ if data.Options.TransformationRules != nil && data.Options.TransformationRules.RemotelinkCommitShaPattern != "" {
+ pattern := data.Options.TransformationRules.RemotelinkCommitShaPattern
commitShaRegex, err = regexp.Compile(pattern)
if err != nil {
return errors.Default.Wrap(err, "regexp Compile pattern failed")
diff --git a/plugins/jira/tasks/task_data.go b/plugins/jira/tasks/task_data.go
index b4bf770..1e99812 100644
--- a/plugins/jira/tasks/task_data.go
+++ b/plugins/jira/tasks/task_data.go
@@ -94,17 +94,22 @@
type JiraOptions struct {
ConnectionId uint64 `json:"connectionId"`
BoardId uint64 `json:"boardId"`
- Since string
+ CreatedDateAfter string
TransformationRules *JiraTransformationRule `json:"transformationRules"`
ScopeId string
TransformationRuleId uint64
}
type JiraTaskData struct {
- Options *JiraOptions
- ApiClient *helper.ApiAsyncClient
- Since *time.Time
- JiraServerInfo models.JiraServerInfo
+ Options *JiraOptions
+ ApiClient *helper.ApiAsyncClient
+ CreatedDateAfter *time.Time
+ JiraServerInfo models.JiraServerInfo
+}
+
+type JiraApiParams struct {
+ ConnectionId uint64
+ BoardId uint64
}
func DecodeAndValidateTaskOptions(options map[string]interface{}) (*JiraOptions, errors.Error) {
diff --git a/plugins/jira/tasks/worklog_collector.go b/plugins/jira/tasks/worklog_collector.go
index 0c3ce7a..3958a81 100644
--- a/plugins/jira/tasks/worklog_collector.go
+++ b/plugins/jira/tasks/worklog_collector.go
@@ -42,10 +42,21 @@
func CollectWorklogs(taskCtx core.SubTaskContext) errors.Error {
db := taskCtx.GetDal()
data := taskCtx.GetData().(*JiraTaskData)
- since := data.Since
logger := taskCtx.GetLogger()
+ collectorWithState, err := helper.NewApiCollectorWithState(helper.RawDataSubTaskArgs{
+ Ctx: taskCtx,
+ Params: JiraApiParams{
+ ConnectionId: data.Options.ConnectionId,
+ BoardId: data.Options.BoardId,
+ },
+ Table: RAW_WORKLOGS_TABLE,
+ }, data.CreatedDateAfter)
+ if err != nil {
+ return err
+ }
+
// filter out issue_ids that needed collection
clauses := []dal.Clause{
dal.Select("i.issue_id, i.updated AS update_time"),
@@ -54,11 +65,10 @@
dal.Join("LEFT JOIN _tool_jira_worklogs wl ON (wl.connection_id = i.connection_id AND wl.issue_id = i.issue_id)"),
dal.Where("i.updated > i.created AND bi.connection_id = ? AND bi.board_id = ? ", data.Options.ConnectionId, data.Options.BoardId),
dal.Groupby("i.issue_id, i.updated"),
- dal.Having("i.updated > max(wl.issue_updated) OR (max(wl.issue_updated) IS NULL AND COUNT(wl.worklog_id) > 0)"),
}
- // apply time range if any
- if since != nil {
- clauses = append(clauses, dal.Where("i.updated > ?", *since))
+ incremental := collectorWithState.CanIncrementCollect()
+ if incremental {
+ clauses = append(clauses, dal.Having("i.updated > max(wl.issue_updated) OR (max(wl.issue_updated) IS NULL AND COUNT(wl.worklog_id) > 0)"))
}
// construct the input iterator
@@ -71,20 +81,12 @@
return err
}
- collector, err := helper.NewApiCollector(helper.ApiCollectorArgs{
- RawDataSubTaskArgs: helper.RawDataSubTaskArgs{
- Ctx: taskCtx,
- Params: JiraApiParams{
- ConnectionId: data.Options.ConnectionId,
- BoardId: data.Options.BoardId,
- },
- Table: RAW_WORKLOGS_TABLE,
- },
+ err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
Input: iterator,
ApiClient: data.ApiClient,
UrlTemplate: "api/2/issue/{{ .Input.IssueId }}/worklog",
PageSize: 50,
- Incremental: since == nil,
+ Incremental: incremental,
GetTotalPages: GetTotalPagesFromResponse,
ResponseParser: func(res *http.Response) ([]json.RawMessage, errors.Error) {
var data struct {
@@ -103,5 +105,5 @@
return err
}
- return collector.Execute()
+ return collectorWithState.Execute()
}