fix: improve jira changelog and worklog collecting (#1960)
* fix: improve jira changelog and worklog collecting
* fix: fix missing isremoved in
diff --git a/plugins/jira/jira.go b/plugins/jira/jira.go
index 31e8292..561b14d 100644
--- a/plugins/jira/jira.go
+++ b/plugins/jira/jira.go
@@ -127,7 +127,12 @@
}
func (plugin Jira) MigrationScripts() []migration.Script {
- return []migration.Script{new(migrationscripts.InitSchemas), new(migrationscripts.UpdateSchemas20220505), new(migrationscripts.UpdateSchemas20220507)}
+ return []migration.Script{
+ new(migrationscripts.InitSchemas),
+ new(migrationscripts.UpdateSchemas20220505),
+ new(migrationscripts.UpdateSchemas20220507),
+ new(migrationscripts.UpdateSchemas20220518),
+ }
}
func (plugin Jira) ApiResources() map[string]map[string]core.ApiResourceHandler {
diff --git a/plugins/jira/models/issue.go b/plugins/jira/models/issue.go
index 97cfaf1..ceb960a 100644
--- a/plugins/jira/models/issue.go
+++ b/plugins/jira/models/issue.go
@@ -49,6 +49,7 @@
// internal status tracking
ChangelogUpdated *time.Time
RemotelinkUpdated *time.Time
+ WorklogUpdated *time.Time
common.NoPKModel
}
diff --git a/plugins/jira/models/migrationscripts/update_schemas20220518.go b/plugins/jira/models/migrationscripts/update_schemas20220518.go
new file mode 100644
index 0000000..4e947bb
--- /dev/null
+++ b/plugins/jira/models/migrationscripts/update_schemas20220518.go
@@ -0,0 +1,79 @@
+package migrationscripts
+
+import (
+ "context"
+ "time"
+
+ "gorm.io/datatypes"
+ "gorm.io/gorm"
+
+ "github.com/merico-dev/lake/models/migrationscripts/archived"
+)
+
+type JiraIssue20220518 struct {
+ // collected fields
+ SourceId uint64 `gorm:"primaryKey"`
+ IssueId uint64 `gorm:"primarykey"`
+ ProjectId uint64
+ Self string `gorm:"type:varchar(255)"`
+ IconURL string `gorm:"type:varchar(255);column:icon_url"`
+ Key string `gorm:"type:varchar(255)"`
+ Summary string
+ Type string `gorm:"type:varchar(255)"`
+ EpicKey string `gorm:"type:varchar(255)"`
+ StatusName string `gorm:"type:varchar(255)"`
+ StatusKey string `gorm:"type:varchar(255)"`
+ StoryPoint float64
+ OriginalEstimateMinutes int64 // user input?
+ AggregateEstimateMinutes int64 // sum up of all subtasks?
+ RemainingEstimateMinutes int64 // could it be negative value?
+ CreatorAccountId string `gorm:"type:varchar(255)"`
+ CreatorAccountType string `gorm:"type:varchar(255)"`
+ CreatorDisplayName string `gorm:"type:varchar(255)"`
+ AssigneeAccountId string `gorm:"type:varchar(255);comment:latest assignee"`
+ AssigneeAccountType string `gorm:"type:varchar(255)"`
+ AssigneeDisplayName string `gorm:"type:varchar(255)"`
+ PriorityId uint64
+ PriorityName string `gorm:"type:varchar(255)"`
+ ParentId uint64
+ ParentKey string `gorm:"type:varchar(255)"`
+ SprintId uint64 // latest sprint, issue might cross multiple sprints, would be addressed by #514
+ SprintName string `gorm:"type:varchar(255)"`
+ ResolutionDate *time.Time
+ Created time.Time
+ Updated time.Time `gorm:"index"`
+ SpentMinutes int64
+ LeadTimeMinutes uint
+ StdStoryPoint uint
+ StdType string `gorm:"type:varchar(255)"`
+ StdStatus string `gorm:"type:varchar(255)"`
+ AllFields datatypes.JSONMap
+
+ // internal status tracking
+ ChangelogUpdated *time.Time
+ RemotelinkUpdated *time.Time
+ WorklogUpdated *time.Time
+ archived.NoPKModel
+}
+
+func (JiraIssue20220518) TableName() string {
+ return "_tool_jira_issues"
+}
+
+type UpdateSchemas20220518 struct{}
+
+func (*UpdateSchemas20220518) Up(ctx context.Context, db *gorm.DB) error {
+ err := db.Migrator().AddColumn(&JiraIssue20220518{}, "worklog_updated")
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+func (*UpdateSchemas20220518) Version() uint64 {
+ return 20220518132510
+}
+
+func (*UpdateSchemas20220518) Name() string {
+ return "Add worklog_updated column to JiraIssue"
+}
diff --git a/plugins/jira/tasks/changelog_collector.go b/plugins/jira/tasks/changelog_collector.go
index d6f265d..9ef753d 100644
--- a/plugins/jira/tasks/changelog_collector.go
+++ b/plugins/jira/tasks/changelog_collector.go
@@ -25,19 +25,6 @@
db := taskCtx.GetDb()
// figure out the time range
since := data.Since
- incremental := false
- if since == nil {
- // user didn't specify a time range to sync, try load from database
- var latestUpdated models.JiraChangelog
- err := db.Where("connection_id = ?", data.Connection.ID).Order("created DESC").Limit(1).Find(&latestUpdated).Error
- if err != nil {
- return fmt.Errorf("failed to get latest jira changelog record: %w", err)
- }
- if latestUpdated.ChangelogId > 0 {
- since = &latestUpdated.Created
- incremental = true
- }
- }
// filter out issue_ids that needed collection
tx := db.Table("_tool_jira_board_issues bi").
@@ -73,7 +60,7 @@
},
ApiClient: data.ApiClient,
PageSize: 50,
- Incremental: incremental,
+ Incremental: true,
Input: iterator,
UrlTemplate: "api/3/issue/{{ .Input.IssueId }}/changelog",
Query: func(reqData *helper.RequestData) (url.Values, error) {
diff --git a/plugins/jira/tasks/sprint_issues_convertor.go b/plugins/jira/tasks/sprint_issues_convertor.go
index d2ae823..e422780 100644
--- a/plugins/jira/tasks/sprint_issues_convertor.go
+++ b/plugins/jira/tasks/sprint_issues_convertor.go
@@ -232,10 +232,12 @@
return nil
}
if item, ok := c.sprintIssue[key]; ok {
- if item != nil && (item.AddedDate == nil || item.AddedDate != nil && item.AddedDate.After(cl.Created)) {
- item.AddedDate = &cl.Created
- item.AddedStage = addedStage
+ if item != nil {
item.IsRemoved = false
+ if item.AddedDate == nil || item.AddedDate.After(cl.Created) {
+ item.AddedDate = &cl.Created
+ item.AddedStage = addedStage
+ }
}
} else {
c.sprintIssue[key] = &ticket.SprintIssue{
@@ -336,7 +338,7 @@
}
func (c *SprintIssuesConverter) getStage(t time.Time, connectionId, sprintId uint64) (*string, error) {
- sprint, err := c.getJiraSprint(sprintId, connectionId)
+ sprint, err := c.getJiraSprint(connectionId, sprintId)
if err != nil {
return nil, err
}
diff --git a/plugins/jira/tasks/worklog_collector.go b/plugins/jira/tasks/worklog_collector.go
index 033f448..7f5bed6 100644
--- a/plugins/jira/tasks/worklog_collector.go
+++ b/plugins/jira/tasks/worklog_collector.go
@@ -2,13 +2,11 @@
import (
"encoding/json"
- "fmt"
"net/http"
"reflect"
"github.com/merico-dev/lake/plugins/core"
"github.com/merico-dev/lake/plugins/helper"
- "github.com/merico-dev/lake/plugins/jira/models"
"github.com/merico-dev/lake/plugins/jira/tasks/apiv2models"
)
@@ -18,29 +16,17 @@
db := taskCtx.GetDb()
data := taskCtx.GetData().(*JiraTaskData)
since := data.Since
- incremental := false
-
- if since == nil {
- var latestUpdated models.JiraWorklog
- err := db.Where("connection_id = ?", data.Connection.ID).Order("updated DESC").Limit(1).Find(&latestUpdated).Error
- if err != nil {
- return fmt.Errorf("failed to get latest jira issue worklog record: %w", err)
- }
- if latestUpdated.IssueId > 0 {
- since = &latestUpdated.Updated
- incremental = true
- }
- }
logger := taskCtx.GetLogger()
connectionId := data.Connection.ID
boardId := data.Options.BoardId
- tx := db.Model(&models.JiraIssue{}).
- Joins("left join _tool_jira_board_issues on _tool_jira_issues.issue_id = _tool_jira_board_issues.issue_id").
- Select("_tool_jira_board_issues.issue_id").Where("_tool_jira_board_issues.connection_id = ? AND _tool_jira_board_issues.board_id = ?", connectionId, boardId)
+ tx := db.Table("_tool_jira_board_issues bi").
+ Select("bi.issue_id, NOW() AS update_time").
+ Joins("LEFT JOIN _tool_jira_issues i ON (bi.connection_id = i.connection_id AND bi.issue_id = i.issue_id)").
+ Where("bi.connection_id = ? AND bi.board_id = ? AND (i.worklog_updated IS NULL OR i.worklog_updated < i.updated)", connectionId, boardId)
if since != nil {
- tx = tx.Where("_tool_jira_issues.updated > ?", since)
+ tx = tx.Where("i.updated > ?", since)
}
cursor, err := tx.Rows()
if err != nil {
@@ -64,7 +50,7 @@
ApiClient: data.ApiClient,
UrlTemplate: "api/2/issue/{{ .Input.IssueId }}/worklog",
PageSize: 50,
- Incremental: incremental,
+ Incremental: true,
GetTotalPages: GetTotalPagesFromResponse,
ResponseParser: func(res *http.Response) ([]json.RawMessage, error) {
var data struct {
diff --git a/plugins/jira/tasks/worklog_extractor.go b/plugins/jira/tasks/worklog_extractor.go
index 4f0820c..e9b0bf1 100644
--- a/plugins/jira/tasks/worklog_extractor.go
+++ b/plugins/jira/tasks/worklog_extractor.go
@@ -2,6 +2,7 @@
import (
"encoding/json"
+ "github.com/merico-dev/lake/plugins/jira/models"
"github.com/merico-dev/lake/plugins/core"
"github.com/merico-dev/lake/plugins/helper"
@@ -12,6 +13,7 @@
func ExtractWorklogs(taskCtx core.SubTaskContext) error {
data := taskCtx.GetData().(*JiraTaskData)
+ db := taskCtx.GetDb()
extractor, err := helper.NewApiExtractor(helper.ApiExtractorArgs{
RawDataSubTaskArgs: helper.RawDataSubTaskArgs{
Ctx: taskCtx,
@@ -22,8 +24,18 @@
Table: RAW_WORKLOGS_TABLE,
},
Extract: func(row *helper.RawData) ([]interface{}, error) {
+ var input apiv2models.Input
+ err := json.Unmarshal(row.Input, &input)
+ if err != nil {
+ return nil, err
+ }
+ issue := &models.JiraIssue{ConnectionId: data.Connection.ID, IssueId: input.IssueId}
+ err = db.Model(issue).Update("worklog_updated", input.UpdateTime).Error
+ if err != nil {
+ return nil, err
+ }
var worklog apiv2models.Worklog
- err := json.Unmarshal(row.Data, &worklog)
+ err = json.Unmarshal(row.Data, &worklog)
if err != nil {
return nil, err
}