fix: improve jira changelog and worklog collecting (#1960)

* fix: improve jira changelog and worklog collecting

* fix: fix missing isremoved in
diff --git a/plugins/jira/jira.go b/plugins/jira/jira.go
index 31e8292..561b14d 100644
--- a/plugins/jira/jira.go
+++ b/plugins/jira/jira.go
@@ -127,7 +127,12 @@
 }
 
 func (plugin Jira) MigrationScripts() []migration.Script {
-	return []migration.Script{new(migrationscripts.InitSchemas), new(migrationscripts.UpdateSchemas20220505), new(migrationscripts.UpdateSchemas20220507)}
+	return []migration.Script{
+		new(migrationscripts.InitSchemas),
+		new(migrationscripts.UpdateSchemas20220505),
+		new(migrationscripts.UpdateSchemas20220507),
+		new(migrationscripts.UpdateSchemas20220518),
+	}
 }
 
 func (plugin Jira) ApiResources() map[string]map[string]core.ApiResourceHandler {
diff --git a/plugins/jira/models/issue.go b/plugins/jira/models/issue.go
index 97cfaf1..ceb960a 100644
--- a/plugins/jira/models/issue.go
+++ b/plugins/jira/models/issue.go
@@ -49,6 +49,7 @@
 	// internal status tracking
 	ChangelogUpdated  *time.Time
 	RemotelinkUpdated *time.Time
+	WorklogUpdated    *time.Time
 	common.NoPKModel
 }
 
diff --git a/plugins/jira/models/migrationscripts/update_schemas20220518.go b/plugins/jira/models/migrationscripts/update_schemas20220518.go
new file mode 100644
index 0000000..4e947bb
--- /dev/null
+++ b/plugins/jira/models/migrationscripts/update_schemas20220518.go
@@ -0,0 +1,79 @@
+package migrationscripts
+
+import (
+	"context"
+	"time"
+
+	"gorm.io/datatypes"
+	"gorm.io/gorm"
+
+	"github.com/merico-dev/lake/models/migrationscripts/archived"
+)
+
+type JiraIssue20220518 struct {
+	// collected fields
+	SourceId                 uint64 `gorm:"primaryKey"`
+	IssueId                  uint64 `gorm:"primarykey"`
+	ProjectId                uint64
+	Self                     string `gorm:"type:varchar(255)"`
+	IconURL                  string `gorm:"type:varchar(255);column:icon_url"`
+	Key                      string `gorm:"type:varchar(255)"`
+	Summary                  string
+	Type                     string `gorm:"type:varchar(255)"`
+	EpicKey                  string `gorm:"type:varchar(255)"`
+	StatusName               string `gorm:"type:varchar(255)"`
+	StatusKey                string `gorm:"type:varchar(255)"`
+	StoryPoint               float64
+	OriginalEstimateMinutes  int64  // user input?
+	AggregateEstimateMinutes int64  // sum up of all subtasks?
+	RemainingEstimateMinutes int64  // could it be negative value?
+	CreatorAccountId         string `gorm:"type:varchar(255)"`
+	CreatorAccountType       string `gorm:"type:varchar(255)"`
+	CreatorDisplayName       string `gorm:"type:varchar(255)"`
+	AssigneeAccountId        string `gorm:"type:varchar(255);comment:latest assignee"`
+	AssigneeAccountType      string `gorm:"type:varchar(255)"`
+	AssigneeDisplayName      string `gorm:"type:varchar(255)"`
+	PriorityId               uint64
+	PriorityName             string `gorm:"type:varchar(255)"`
+	ParentId                 uint64
+	ParentKey                string `gorm:"type:varchar(255)"`
+	SprintId                 uint64 // latest sprint, issue might cross multiple sprints, would be addressed by #514
+	SprintName               string `gorm:"type:varchar(255)"`
+	ResolutionDate           *time.Time
+	Created                  time.Time
+	Updated                  time.Time `gorm:"index"`
+	SpentMinutes             int64
+	LeadTimeMinutes          uint
+	StdStoryPoint            uint
+	StdType                  string `gorm:"type:varchar(255)"`
+	StdStatus                string `gorm:"type:varchar(255)"`
+	AllFields                datatypes.JSONMap
+
+	// internal status tracking
+	ChangelogUpdated  *time.Time
+	RemotelinkUpdated *time.Time
+	WorklogUpdated    *time.Time
+	archived.NoPKModel
+}
+
+func (JiraIssue20220518) TableName() string {
+	return "_tool_jira_issues"
+}
+
+type UpdateSchemas20220518 struct{}
+
+func (*UpdateSchemas20220518) Up(ctx context.Context, db *gorm.DB) error {
+	err := db.Migrator().AddColumn(&JiraIssue20220518{}, "worklog_updated")
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+func (*UpdateSchemas20220518) Version() uint64 {
+	return 20220518132510
+}
+
+func (*UpdateSchemas20220518) Name() string {
+	return "Add worklog_updated column to JiraIssue"
+}
diff --git a/plugins/jira/tasks/changelog_collector.go b/plugins/jira/tasks/changelog_collector.go
index d6f265d..9ef753d 100644
--- a/plugins/jira/tasks/changelog_collector.go
+++ b/plugins/jira/tasks/changelog_collector.go
@@ -25,19 +25,6 @@
 	db := taskCtx.GetDb()
 	// figure out the time range
 	since := data.Since
-	incremental := false
-	if since == nil {
-		// user didn't specify a time range to sync, try load from database
-		var latestUpdated models.JiraChangelog
-		err := db.Where("connection_id = ?", data.Connection.ID).Order("created DESC").Limit(1).Find(&latestUpdated).Error
-		if err != nil {
-			return fmt.Errorf("failed to get latest jira changelog record: %w", err)
-		}
-		if latestUpdated.ChangelogId > 0 {
-			since = &latestUpdated.Created
-			incremental = true
-		}
-	}
 
 	// filter out issue_ids that needed collection
 	tx := db.Table("_tool_jira_board_issues bi").
@@ -73,7 +60,7 @@
 		},
 		ApiClient:   data.ApiClient,
 		PageSize:    50,
-		Incremental: incremental,
+		Incremental: true,
 		Input:       iterator,
 		UrlTemplate: "api/3/issue/{{ .Input.IssueId }}/changelog",
 		Query: func(reqData *helper.RequestData) (url.Values, error) {
diff --git a/plugins/jira/tasks/sprint_issues_convertor.go b/plugins/jira/tasks/sprint_issues_convertor.go
index d2ae823..e422780 100644
--- a/plugins/jira/tasks/sprint_issues_convertor.go
+++ b/plugins/jira/tasks/sprint_issues_convertor.go
@@ -232,10 +232,12 @@
 		return nil
 	}
 	if item, ok := c.sprintIssue[key]; ok {
-		if item != nil && (item.AddedDate == nil || item.AddedDate != nil && item.AddedDate.After(cl.Created)) {
-			item.AddedDate = &cl.Created
-			item.AddedStage = addedStage
+		if item != nil {
 			item.IsRemoved = false
+			if item.AddedDate == nil || item.AddedDate.After(cl.Created) {
+				item.AddedDate = &cl.Created
+				item.AddedStage = addedStage
+			}
 		}
 	} else {
 		c.sprintIssue[key] = &ticket.SprintIssue{
@@ -336,7 +338,7 @@
 }
 
 func (c *SprintIssuesConverter) getStage(t time.Time, connectionId, sprintId uint64) (*string, error) {
-	sprint, err := c.getJiraSprint(sprintId, connectionId)
+	sprint, err := c.getJiraSprint(connectionId, sprintId)
 	if err != nil {
 		return nil, err
 	}
diff --git a/plugins/jira/tasks/worklog_collector.go b/plugins/jira/tasks/worklog_collector.go
index 033f448..7f5bed6 100644
--- a/plugins/jira/tasks/worklog_collector.go
+++ b/plugins/jira/tasks/worklog_collector.go
@@ -2,13 +2,11 @@
 
 import (
 	"encoding/json"
-	"fmt"
 	"net/http"
 	"reflect"
 
 	"github.com/merico-dev/lake/plugins/core"
 	"github.com/merico-dev/lake/plugins/helper"
-	"github.com/merico-dev/lake/plugins/jira/models"
 	"github.com/merico-dev/lake/plugins/jira/tasks/apiv2models"
 )
 
@@ -18,29 +16,17 @@
 	db := taskCtx.GetDb()
 	data := taskCtx.GetData().(*JiraTaskData)
 	since := data.Since
-	incremental := false
-
-	if since == nil {
-		var latestUpdated models.JiraWorklog
-		err := db.Where("connection_id = ?", data.Connection.ID).Order("updated DESC").Limit(1).Find(&latestUpdated).Error
-		if err != nil {
-			return fmt.Errorf("failed to get latest jira issue worklog record: %w", err)
-		}
-		if latestUpdated.IssueId > 0 {
-			since = &latestUpdated.Updated
-			incremental = true
-		}
-	}
 
 	logger := taskCtx.GetLogger()
 	connectionId := data.Connection.ID
 	boardId := data.Options.BoardId
-	tx := db.Model(&models.JiraIssue{}).
-		Joins("left join _tool_jira_board_issues on _tool_jira_issues.issue_id = _tool_jira_board_issues.issue_id").
-		Select("_tool_jira_board_issues.issue_id").Where("_tool_jira_board_issues.connection_id = ? AND _tool_jira_board_issues.board_id = ?", connectionId, boardId)
+	tx := db.Table("_tool_jira_board_issues bi").
+		Select("bi.issue_id, NOW() AS update_time").
+		Joins("LEFT JOIN _tool_jira_issues i ON (bi.connection_id = i.connection_id AND bi.issue_id = i.issue_id)").
+		Where("bi.connection_id = ? AND bi.board_id = ? AND (i.worklog_updated IS NULL OR i.worklog_updated < i.updated)", connectionId, boardId)
 
 	if since != nil {
-		tx = tx.Where("_tool_jira_issues.updated > ?", since)
+		tx = tx.Where("i.updated > ?", since)
 	}
 	cursor, err := tx.Rows()
 	if err != nil {
@@ -64,7 +50,7 @@
 		ApiClient:     data.ApiClient,
 		UrlTemplate:   "api/2/issue/{{ .Input.IssueId }}/worklog",
 		PageSize:      50,
-		Incremental:   incremental,
+		Incremental:   true,
 		GetTotalPages: GetTotalPagesFromResponse,
 		ResponseParser: func(res *http.Response) ([]json.RawMessage, error) {
 			var data struct {
diff --git a/plugins/jira/tasks/worklog_extractor.go b/plugins/jira/tasks/worklog_extractor.go
index 4f0820c..e9b0bf1 100644
--- a/plugins/jira/tasks/worklog_extractor.go
+++ b/plugins/jira/tasks/worklog_extractor.go
@@ -2,6 +2,7 @@
 
 import (
 	"encoding/json"
+	"github.com/merico-dev/lake/plugins/jira/models"
 
 	"github.com/merico-dev/lake/plugins/core"
 	"github.com/merico-dev/lake/plugins/helper"
@@ -12,6 +13,7 @@
 
 func ExtractWorklogs(taskCtx core.SubTaskContext) error {
 	data := taskCtx.GetData().(*JiraTaskData)
+	db := taskCtx.GetDb()
 	extractor, err := helper.NewApiExtractor(helper.ApiExtractorArgs{
 		RawDataSubTaskArgs: helper.RawDataSubTaskArgs{
 			Ctx: taskCtx,
@@ -22,8 +24,18 @@
 			Table: RAW_WORKLOGS_TABLE,
 		},
 		Extract: func(row *helper.RawData) ([]interface{}, error) {
+			var input apiv2models.Input
+			err := json.Unmarshal(row.Input, &input)
+			if err != nil {
+				return nil, err
+			}
+			issue := &models.JiraIssue{ConnectionId: data.Connection.ID, IssueId: input.IssueId}
+			err = db.Model(issue).Update("worklog_updated", input.UpdateTime).Error
+			if err != nil {
+				return nil, err
+			}
 			var worklog apiv2models.Worklog
-			err := json.Unmarshal(row.Data, &worklog)
+			err = json.Unmarshal(row.Data, &worklog)
 			if err != nil {
 				return nil, err
 			}