feat: add collector_meta and CreatedDateAfter for jira (#3813)

* feat: add collector_meta and startFrom for jira

* fix: fix for linter

* feat: record state in framework

* fix: add some comment

* fix: only save state when collect success

* fix: change column name for review
diff --git a/models/collector_state.go b/models/collector_state.go
new file mode 100644
index 0000000..e0230a4
--- /dev/null
+++ b/models/collector_state.go
@@ -0,0 +1,35 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package models
+
+import (
+	"time"
+)
+
+type CollectorLatestState struct {
+	CreatedAt          time.Time `json:"createdAt"`
+	UpdatedAt          time.Time `json:"updatedAt"`
+	RawDataParams      string    `gorm:"primaryKey;column:raw_data_params;type:varchar(255);index" json:"raw_data_params"`
+	RawDataTable       string    `gorm:"primaryKey;column:raw_data_table;type:varchar(255)" json:"raw_data_table"`
+	CreatedDateAfter   *time.Time
+	LatestSuccessStart *time.Time
+}
+
+func (CollectorLatestState) TableName() string {
+	return "_devlake_collector_latest_state"
+}
diff --git a/models/migrationscripts/20221125_add_collector_state.go b/models/migrationscripts/20221125_add_collector_state.go
new file mode 100644
index 0000000..8aed5fc
--- /dev/null
+++ b/models/migrationscripts/20221125_add_collector_state.go
@@ -0,0 +1,39 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package migrationscripts
+
+import (
+	"github.com/apache/incubator-devlake/errors"
+	"github.com/apache/incubator-devlake/helpers/migrationhelper"
+	"github.com/apache/incubator-devlake/models/migrationscripts/archived"
+	"github.com/apache/incubator-devlake/plugins/core"
+)
+
+type addCollectorMeta20221125 struct{}
+
+func (script *addCollectorMeta20221125) Up(basicRes core.BasicRes) errors.Error {
+	return migrationhelper.AutoMigrateTables(basicRes, &archived.CollectorLatestState{})
+}
+
+func (*addCollectorMeta20221125) Version() uint64 {
+	return 20221125000038
+}
+
+func (*addCollectorMeta20221125) Name() string {
+	return "save state for collector"
+}
diff --git a/models/migrationscripts/archived/collector_state.go b/models/migrationscripts/archived/collector_state.go
new file mode 100644
index 0000000..e710803
--- /dev/null
+++ b/models/migrationscripts/archived/collector_state.go
@@ -0,0 +1,35 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package archived
+
+import (
+	"time"
+)
+
+type CollectorLatestState struct {
+	CreatedAt          time.Time `json:"createdAt"`
+	UpdatedAt          time.Time `json:"updatedAt"`
+	RawDataParams      string    `gorm:"primaryKey;column:raw_data_params;type:varchar(255);index" json:"raw_data_params"`
+	RawDataTable       string    `gorm:"primaryKey;column:raw_data_table;type:varchar(255)" json:"raw_data_table"`
+	CreatedDateAfter   *time.Time
+	LatestSuccessStart *time.Time
+}
+
+func (CollectorLatestState) TableName() string {
+	return "_devlake_collector_latest_state"
+}
diff --git a/models/migrationscripts/register.go b/models/migrationscripts/register.go
index f9d1029..ad39df3 100644
--- a/models/migrationscripts/register.go
+++ b/models/migrationscripts/register.go
@@ -63,5 +63,6 @@
 		new(addLabels),
 		new(renameFiledsInProjectPrMetric),
 		new(addEnableToProjectMetric),
+		new(addCollectorMeta20221125),
 	}
 }
diff --git a/plugins/core/plugin_task.go b/plugins/core/plugin_task.go
index ea09e98..5716f5b 100644
--- a/plugins/core/plugin_task.go
+++ b/plugins/core/plugin_task.go
@@ -104,7 +104,7 @@
 	PrepareTaskData(taskCtx TaskContext, options map[string]interface{}) (interface{}, errors.Error)
 }
 
-// CloseablePluginTask Extends PluginTask, and invokes a Close method after all subtasks are done
+// CloseablePluginTask Extends PluginTask, and invokes a Close method after all subtasks are done or fail
 type CloseablePluginTask interface {
 	PluginTask
 	Close(taskCtx TaskContext) errors.Error
diff --git a/plugins/helper/api_collector_with_state.go b/plugins/helper/api_collector_with_state.go
new file mode 100644
index 0000000..6404b75
--- /dev/null
+++ b/plugins/helper/api_collector_with_state.go
@@ -0,0 +1,94 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package helper
+
+import (
+	"github.com/apache/incubator-devlake/errors"
+	"github.com/apache/incubator-devlake/models"
+	"github.com/apache/incubator-devlake/plugins/core/dal"
+	"gorm.io/gorm"
+	"time"
+)
+
+// ApiCollectorStateManager save collector state in framework table
+type ApiCollectorStateManager struct {
+	RawDataSubTaskArgs
+	*ApiCollector
+	LatestState      models.CollectorLatestState
+	CreatedDateAfter *time.Time
+	ExecuteStart     time.Time
+}
+
+// NewApiCollectorWithState create a new ApiCollectorStateManager
+func NewApiCollectorWithState(args RawDataSubTaskArgs, createdDateAfter *time.Time) (*ApiCollectorStateManager, errors.Error) {
+	db := args.Ctx.GetDal()
+
+	rawDataSubTask, err := NewRawDataSubTask(args)
+	if err != nil {
+		return nil, errors.Default.Wrap(err, "Couldn't resolve raw subtask args")
+	}
+	latestState := models.CollectorLatestState{}
+	err = db.First(&latestState, dal.Where(`raw_data_table = ? AND raw_data_params = ?`, rawDataSubTask.table, rawDataSubTask.params))
+	if err != nil {
+		if errors.Is(err, gorm.ErrRecordNotFound) {
+			latestState = models.CollectorLatestState{
+				RawDataTable:  rawDataSubTask.table,
+				RawDataParams: rawDataSubTask.params,
+			}
+		} else {
+			return nil, errors.Default.Wrap(err, "failed to load JiraLatestCollectorMeta")
+		}
+	}
+	return &ApiCollectorStateManager{
+		RawDataSubTaskArgs: args,
+		LatestState:        latestState,
+		CreatedDateAfter:   createdDateAfter,
+		ExecuteStart:       time.Now(),
+	}, nil
+}
+
+// CanIncrementCollect return if the old data can support collect incrementally.
+// only when latest collection is success &&
+// (m.LatestState.CreatedDateAfter == nil means all data have been collected ||
+// CreatedDateAfter at this time exists and later than in the LatestState)
+// if CreatedDateAfter at this time not exists, collect incrementally only when "m.LatestState.CreatedDateAfter == nil"
+func (m ApiCollectorStateManager) CanIncrementCollect() bool {
+	return m.LatestState.LatestSuccessStart != nil &&
+		(m.LatestState.CreatedDateAfter == nil || m.CreatedDateAfter != nil && m.CreatedDateAfter.After(*m.LatestState.CreatedDateAfter))
+}
+
+// InitCollector init the embedded collector
+func (m *ApiCollectorStateManager) InitCollector(args ApiCollectorArgs) (err errors.Error) {
+	args.RawDataSubTaskArgs = m.RawDataSubTaskArgs
+	m.ApiCollector, err = NewApiCollector(args)
+	return err
+}
+
+// Execute the embedded collector and record execute state
+func (m ApiCollectorStateManager) Execute() errors.Error {
+	err := m.ApiCollector.Execute()
+	if err != nil {
+		return err
+	}
+
+	db := m.Ctx.GetDal()
+	m.LatestState.LatestSuccessStart = &m.ExecuteStart
+	m.LatestState.CreatedDateAfter = m.CreatedDateAfter
+	err = db.CreateOrUpdate(&m.LatestState)
+	return err
+}
diff --git a/plugins/jira/impl/impl.go b/plugins/jira/impl/impl.go
index 29aaf36..2aff632 100644
--- a/plugins/jira/impl/impl.go
+++ b/plugins/jira/impl/impl.go
@@ -171,9 +171,9 @@
 		return nil, errors.Default.Wrap(err, "unable to get Jira connection")
 	}
 
-	var since time.Time
-	if op.Since != "" {
-		since, err = time.Parse("2006-01-02T15:04:05Z", op.Since)
+	var createdDateAfter time.Time
+	if op.CreatedDateAfter != "" {
+		createdDateAfter, err = time.Parse("2006-01-02T15:04:05Z", op.CreatedDateAfter)
 		if err != nil {
 			return nil, errors.BadInput.Wrap(err, "invalid value for `since`")
 		}
@@ -208,10 +208,11 @@
 		ApiClient:      jiraApiClient,
 		JiraServerInfo: *info,
 	}
-	if !since.IsZero() {
-		taskData.Since = &since
-		logger.Debug("collect data updated since %s", since)
+	if !createdDateAfter.IsZero() {
+		taskData.CreatedDateAfter = &createdDateAfter
+		logger.Debug("collect data created from %s", createdDateAfter)
 	}
+
 	return taskData, nil
 }
 
diff --git a/plugins/jira/jira.go b/plugins/jira/jira.go
index 0cf85de..dcb50c3 100644
--- a/plugins/jira/jira.go
+++ b/plugins/jira/jira.go
@@ -32,12 +32,12 @@
 	boardId := cmd.Flags().Uint64P("board", "b", 0, "jira board id")
 	_ = cmd.MarkFlagRequired("connection")
 	_ = cmd.MarkFlagRequired("board")
-	since := cmd.Flags().StringP("since", "s", "", "collect data that are updated after specified time, ie 2006-05-06T07:08:09Z")
+	CreatedDateAfter := cmd.Flags().StringP("created_date_after", "a", "", "collect data that are updated after specified time, ie 2006-05-06T07:08:09Z")
 	cmd.Run = func(c *cobra.Command, args []string) {
 		runner.DirectRun(c, args, PluginEntry, map[string]interface{}{
-			"connectionId": *connectionId,
-			"boardId":      *boardId,
-			"since":        *since,
+			"connectionId":     *connectionId,
+			"boardId":          *boardId,
+			"CreatedDateAfter": *CreatedDateAfter,
 		})
 	}
 	runner.RunCmd(cmd)
diff --git a/plugins/jira/tasks/epic_collector.go b/plugins/jira/tasks/epic_collector.go
index 78e6c31..ef4c270 100644
--- a/plugins/jira/tasks/epic_collector.go
+++ b/plugins/jira/tasks/epic_collector.go
@@ -53,12 +53,7 @@
 	if err != nil {
 		return err
 	}
-	since := data.Since
 	jql := "ORDER BY created ASC"
-	if since != nil {
-		// prepend a time range criteria if `since` was specified, either by user or from database
-		jql = fmt.Sprintf("updated >= '%s' %s", since.Format("2006/01/02 15:04"), jql)
-	}
 	collector, err := helper.NewApiCollector(helper.ApiCollectorArgs{
 		RawDataSubTaskArgs: helper.RawDataSubTaskArgs{
 			Ctx: taskCtx,
diff --git a/plugins/jira/tasks/issue_changelog_collector.go b/plugins/jira/tasks/issue_changelog_collector.go
index dc3e6f1..30c47cd 100644
--- a/plugins/jira/tasks/issue_changelog_collector.go
+++ b/plugins/jira/tasks/issue_changelog_collector.go
@@ -52,6 +52,18 @@
 	log := taskCtx.GetLogger()
 	db := taskCtx.GetDal()
 
+	collectorWithState, err := helper.NewApiCollectorWithState(helper.RawDataSubTaskArgs{
+		Ctx: taskCtx,
+		Params: JiraApiParams{
+			ConnectionId: data.Options.ConnectionId,
+			BoardId:      data.Options.BoardId,
+		},
+		Table: RAW_CHANGELOG_TABLE,
+	}, data.CreatedDateAfter)
+	if err != nil {
+		return err
+	}
+
 	// query for issue_ids that needed changelog collection
 	clauses := []dal.Clause{
 		dal.Select("i.issue_id, i.updated AS update_time"),
@@ -60,12 +72,10 @@
 		dal.Join("LEFT JOIN _tool_jira_issue_changelogs c ON (c.connection_id = i.connection_id AND c.issue_id = i.issue_id)"),
 		dal.Where("i.updated > i.created AND bi.connection_id = ?  AND bi.board_id = ? AND i.std_type != ? ", data.Options.ConnectionId, data.Options.BoardId, "Epic"),
 		dal.Groupby("i.issue_id, i.updated"),
-		dal.Having("i.updated > max(c.issue_updated) OR  (max(c.issue_updated) IS NULL AND COUNT(c.changelog_id) > 0)"),
 	}
-	// apply time range if any
-	since := data.Since
-	if since != nil {
-		clauses = append(clauses, dal.Where("i.updated > ?", *since))
+	incremental := collectorWithState.CanIncrementCollect()
+	if incremental {
+		clauses = append(clauses, dal.Having("i.updated > max(c.issue_updated) OR  (max(c.issue_updated) IS NULL AND COUNT(c.changelog_id) > 0)"))
 	}
 
 	if log.IsLevelEnabled(core.LOG_DEBUG) {
@@ -88,18 +98,10 @@
 	}
 
 	// now, let ApiCollector takes care the rest
-	collector, err := helper.NewApiCollector(helper.ApiCollectorArgs{
-		RawDataSubTaskArgs: helper.RawDataSubTaskArgs{
-			Ctx: taskCtx,
-			Params: JiraApiParams{
-				ConnectionId: data.Options.ConnectionId,
-				BoardId:      data.Options.BoardId,
-			},
-			Table: RAW_CHANGELOG_TABLE,
-		},
+	err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
 		ApiClient:     data.ApiClient,
 		PageSize:      100,
-		Incremental:   since == nil,
+		Incremental:   incremental,
 		GetTotalPages: GetTotalPagesFromResponse,
 		Input:         iterator,
 		UrlTemplate:   "api/3/issue/{{ .Input.IssueId }}/changelog",
@@ -127,5 +129,5 @@
 		return err
 	}
 
-	return collector.Execute()
+	return collectorWithState.Execute()
 }
diff --git a/plugins/jira/tasks/issue_collector.go b/plugins/jira/tasks/issue_collector.go
index 50db842..2120e6b 100644
--- a/plugins/jira/tasks/issue_collector.go
+++ b/plugins/jira/tasks/issue_collector.go
@@ -21,26 +21,20 @@
 	"encoding/json"
 	goerror "errors"
 	"fmt"
-	"github.com/apache/incubator-devlake/errors"
+	"gorm.io/gorm"
 	"io"
 	"net/http"
 	"net/url"
 
+	"github.com/apache/incubator-devlake/errors"
 	"github.com/apache/incubator-devlake/plugins/core"
 	"github.com/apache/incubator-devlake/plugins/core/dal"
 	"github.com/apache/incubator-devlake/plugins/helper"
 	"github.com/apache/incubator-devlake/plugins/jira/models"
-	"gorm.io/gorm"
 )
 
 const RAW_ISSUE_TABLE = "jira_api_issues"
 
-// this struct should be moved to `jira_api_common.go`
-type JiraApiParams struct {
-	ConnectionId uint64
-	BoardId      uint64
-}
-
 var _ core.SubTaskEntryPoint = CollectIssues
 
 var CollectIssuesMeta = core.SubTaskMeta{
@@ -55,10 +49,38 @@
 	db := taskCtx.GetDal()
 	data := taskCtx.GetData().(*JiraTaskData)
 
-	since := data.Since
-	incremental := false
-	// user didn't specify a time range to sync, try load from database
-	if since == nil {
+	collectorWithState, err := helper.NewApiCollectorWithState(helper.RawDataSubTaskArgs{
+		Ctx: taskCtx,
+		/*
+			This struct will be JSONEncoded and stored into database along with raw data itself, to identity minimal
+			set of data to be process, for example, we process JiraIssues by Board
+		*/
+		Params: JiraApiParams{
+			ConnectionId: data.Options.ConnectionId,
+			BoardId:      data.Options.BoardId,
+		},
+		/*
+			Table store raw data
+		*/
+		Table: RAW_ISSUE_TABLE,
+	}, data.CreatedDateAfter)
+	if err != nil {
+		return err
+	}
+
+	// build jql
+	// IMPORTANT: we have to keep paginated data in a consistence order to avoid data-missing, if we sort issues by
+	//  `updated`, issue will be jumping between pages if it got updated during the collection process
+	createdDateAfter := data.CreatedDateAfter
+	jql := "ORDER BY created ASC"
+	if createdDateAfter != nil {
+		// prepend a time range criteria if `since` was specified, either by user or from database
+		jql = fmt.Sprintf("created >= '%v' %v", createdDateAfter.Format("2006/01/02 15:04"), jql)
+	}
+
+	incremental := collectorWithState.CanIncrementCollect()
+	if incremental {
+		// user didn't specify a time range to sync, try load from database
 		var latestUpdated models.JiraIssue
 		clauses := []dal.Clause{
 			dal.Select("_tool_jira_issues.*"),
@@ -72,35 +94,13 @@
 			return errors.NotFound.Wrap(err, "failed to get latest jira issue record")
 		}
 		if latestUpdated.IssueId > 0 {
-			since = &latestUpdated.Updated
-			incremental = true
+			jql = fmt.Sprintf("updated >= '%v' %v", latestUpdated.Updated.Format("2006/01/02 15:04"), jql)
+		} else {
+			incremental = false
 		}
 	}
-	// build jql
-	// IMPORTANT: we have to keep paginated data in a consistence order to avoid data-missing, if we sort issues by
-	//  `updated`, issue will be jumping between pages if it got updated during the collection process
-	jql := "ORDER BY created ASC"
-	if since != nil {
-		// prepend a time range criteria if `since` was specified, either by user or from database
-		jql = fmt.Sprintf("updated >= '%v' %v", since.Format("2006/01/02 15:04"), jql)
-	}
 
-	collector, err := helper.NewApiCollector(helper.ApiCollectorArgs{
-		RawDataSubTaskArgs: helper.RawDataSubTaskArgs{
-			Ctx: taskCtx,
-			/*
-				This struct will be JSONEncoded and stored into database along with raw data itself, to identity minimal
-				set of data to be process, for example, we process JiraIssues by Board
-			*/
-			Params: JiraApiParams{
-				ConnectionId: data.Options.ConnectionId,
-				BoardId:      data.Options.BoardId,
-			},
-			/*
-				Table store raw data
-			*/
-			Table: RAW_ISSUE_TABLE,
-		},
+	err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
 		ApiClient:   data.ApiClient,
 		PageSize:    100,
 		Incremental: incremental,
@@ -159,10 +159,9 @@
 			return data.Issues, nil
 		},
 	})
-
 	if err != nil {
 		return err
 	}
 
-	return collector.Execute()
+	return collectorWithState.Execute()
 }
diff --git a/plugins/jira/tasks/issue_extractor.go b/plugins/jira/tasks/issue_extractor.go
index bf6f623..7f61c52 100644
--- a/plugins/jira/tasks/issue_extractor.go
+++ b/plugins/jira/tasks/issue_extractor.go
@@ -111,7 +111,7 @@
 	if issue.ResolutionDate != nil {
 		issue.LeadTimeMinutes = uint(issue.ResolutionDate.Unix()-issue.Created.Unix()) / 60
 	}
-	if data.Options.TransformationRules.StoryPointField != "" {
+	if data.Options.TransformationRules != nil && data.Options.TransformationRules.StoryPointField != "" {
 		unknownStoryPoint := apiIssue.Fields.AllFields[data.Options.TransformationRules.StoryPointField]
 		switch sp := unknownStoryPoint.(type) {
 		case string:
@@ -193,9 +193,11 @@
 	}
 	stdTypeMappings := make(map[string]string)
 	standardStatusMappings := make(map[string]StatusMappings)
-	for userType, stdType := range data.Options.TransformationRules.TypeMappings {
-		stdTypeMappings[userType] = strings.ToUpper(stdType.StandardType)
-		standardStatusMappings[userType] = stdType.StatusMappings
+	if data.Options.TransformationRules != nil {
+		for userType, stdType := range data.Options.TransformationRules.TypeMappings {
+			stdTypeMappings[userType] = strings.ToUpper(stdType.StandardType)
+			standardStatusMappings[userType] = stdType.StatusMappings
+		}
 	}
 	return &typeMappings{
 		typeIdMappings:         typeIdMapping,
diff --git a/plugins/jira/tasks/remotelink_collector.go b/plugins/jira/tasks/remotelink_collector.go
index b641f9d..4867225 100644
--- a/plugins/jira/tasks/remotelink_collector.go
+++ b/plugins/jira/tasks/remotelink_collector.go
@@ -47,6 +47,18 @@
 	logger := taskCtx.GetLogger()
 	logger.Info("collect remotelink")
 
+	collectorWithState, err := helper.NewApiCollectorWithState(helper.RawDataSubTaskArgs{
+		Ctx: taskCtx,
+		Params: JiraApiParams{
+			ConnectionId: data.Options.ConnectionId,
+			BoardId:      data.Options.BoardId,
+		},
+		Table: RAW_REMOTELINK_TABLE,
+	}, data.CreatedDateAfter)
+	if err != nil {
+		return err
+	}
+
 	clauses := []dal.Clause{
 		dal.Select("i.issue_id, i.updated AS update_time"),
 		dal.From("_tool_jira_board_issues bi"),
@@ -54,12 +66,14 @@
 		dal.Join("LEFT JOIN _tool_jira_remotelinks rl ON (rl.connection_id = i.connection_id AND rl.issue_id = i.issue_id)"),
 		dal.Where("i.updated > i.created AND bi.connection_id = ?  AND bi.board_id = ?  ", data.Options.ConnectionId, data.Options.BoardId),
 		dal.Groupby("i.issue_id, i.updated"),
-		dal.Having("i.updated > max(rl.issue_updated) OR  max(rl.issue_updated) IS NULL"),
 	}
-	// apply time range if any
-	since := data.Since
-	if since != nil {
-		clauses = append(clauses, dal.Where("i.updated > ?", *since))
+	incremental := collectorWithState.CanIncrementCollect()
+	if incremental {
+		if collectorWithState.LatestState.LatestSuccessStart != nil {
+			clauses = append(clauses, dal.Having("i.updated > ? AND (i.updated > max(rl.issue_updated) OR max(rl.issue_updated) IS NULL)", collectorWithState.LatestState.LatestSuccessStart))
+		} else {
+			clauses = append(clauses, dal.Having("i.updated > max(rl.issue_updated) OR max(rl.issue_updated) IS NULL"))
+		}
 	}
 	cursor, err := db.Cursor(clauses...)
 	if err != nil {
@@ -73,18 +87,10 @@
 		return err
 	}
 
-	collector, err := helper.NewApiCollector(helper.ApiCollectorArgs{
-		RawDataSubTaskArgs: helper.RawDataSubTaskArgs{
-			Ctx: taskCtx,
-			Params: JiraApiParams{
-				ConnectionId: data.Options.ConnectionId,
-				BoardId:      data.Options.BoardId,
-			},
-			Table: RAW_REMOTELINK_TABLE,
-		},
+	err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
 		ApiClient:   data.ApiClient,
 		Input:       iterator,
-		Incremental: since == nil,
+		Incremental: incremental,
 		UrlTemplate: "api/2/issue/{{ .Input.IssueId }}/remotelink",
 		ResponseParser: func(res *http.Response) ([]json.RawMessage, errors.Error) {
 			if res.StatusCode == http.StatusNotFound {
@@ -102,7 +108,7 @@
 	if err != nil {
 		return err
 	}
-	err = collector.Execute()
+	err = collectorWithState.Execute()
 	if err != nil {
 		return err
 	}
diff --git a/plugins/jira/tasks/remotelink_extractor.go b/plugins/jira/tasks/remotelink_extractor.go
index b857e26..0418a1a 100644
--- a/plugins/jira/tasks/remotelink_extractor.go
+++ b/plugins/jira/tasks/remotelink_extractor.go
@@ -45,7 +45,8 @@
 	logger.Info("extract remote links")
 	var commitShaRegex *regexp.Regexp
 	var err error
-	if pattern := data.Options.TransformationRules.RemotelinkCommitShaPattern; pattern != "" {
+	if data.Options.TransformationRules != nil && data.Options.TransformationRules.RemotelinkCommitShaPattern != "" {
+		pattern := data.Options.TransformationRules.RemotelinkCommitShaPattern
 		commitShaRegex, err = regexp.Compile(pattern)
 		if err != nil {
 			return errors.Default.Wrap(err, "regexp Compile pattern failed")
diff --git a/plugins/jira/tasks/task_data.go b/plugins/jira/tasks/task_data.go
index b4bf770..1e99812 100644
--- a/plugins/jira/tasks/task_data.go
+++ b/plugins/jira/tasks/task_data.go
@@ -94,17 +94,22 @@
 type JiraOptions struct {
 	ConnectionId         uint64 `json:"connectionId"`
 	BoardId              uint64 `json:"boardId"`
-	Since                string
+	CreatedDateAfter     string
 	TransformationRules  *JiraTransformationRule `json:"transformationRules"`
 	ScopeId              string
 	TransformationRuleId uint64
 }
 
 type JiraTaskData struct {
-	Options        *JiraOptions
-	ApiClient      *helper.ApiAsyncClient
-	Since          *time.Time
-	JiraServerInfo models.JiraServerInfo
+	Options          *JiraOptions
+	ApiClient        *helper.ApiAsyncClient
+	CreatedDateAfter *time.Time
+	JiraServerInfo   models.JiraServerInfo
+}
+
+type JiraApiParams struct {
+	ConnectionId uint64
+	BoardId      uint64
 }
 
 func DecodeAndValidateTaskOptions(options map[string]interface{}) (*JiraOptions, errors.Error) {
diff --git a/plugins/jira/tasks/worklog_collector.go b/plugins/jira/tasks/worklog_collector.go
index 0c3ce7a..3958a81 100644
--- a/plugins/jira/tasks/worklog_collector.go
+++ b/plugins/jira/tasks/worklog_collector.go
@@ -42,10 +42,21 @@
 func CollectWorklogs(taskCtx core.SubTaskContext) errors.Error {
 	db := taskCtx.GetDal()
 	data := taskCtx.GetData().(*JiraTaskData)
-	since := data.Since
 
 	logger := taskCtx.GetLogger()
 
+	collectorWithState, err := helper.NewApiCollectorWithState(helper.RawDataSubTaskArgs{
+		Ctx: taskCtx,
+		Params: JiraApiParams{
+			ConnectionId: data.Options.ConnectionId,
+			BoardId:      data.Options.BoardId,
+		},
+		Table: RAW_WORKLOGS_TABLE,
+	}, data.CreatedDateAfter)
+	if err != nil {
+		return err
+	}
+
 	// filter out issue_ids that needed collection
 	clauses := []dal.Clause{
 		dal.Select("i.issue_id, i.updated AS update_time"),
@@ -54,11 +65,10 @@
 		dal.Join("LEFT JOIN _tool_jira_worklogs wl ON (wl.connection_id = i.connection_id AND wl.issue_id = i.issue_id)"),
 		dal.Where("i.updated > i.created AND bi.connection_id = ?  AND bi.board_id = ?  ", data.Options.ConnectionId, data.Options.BoardId),
 		dal.Groupby("i.issue_id, i.updated"),
-		dal.Having("i.updated > max(wl.issue_updated) OR  (max(wl.issue_updated) IS NULL AND COUNT(wl.worklog_id) > 0)"),
 	}
-	// apply time range if any
-	if since != nil {
-		clauses = append(clauses, dal.Where("i.updated > ?", *since))
+	incremental := collectorWithState.CanIncrementCollect()
+	if incremental {
+		clauses = append(clauses, dal.Having("i.updated > max(wl.issue_updated) OR  (max(wl.issue_updated) IS NULL AND COUNT(wl.worklog_id) > 0)"))
 	}
 
 	// construct the input iterator
@@ -71,20 +81,12 @@
 		return err
 	}
 
-	collector, err := helper.NewApiCollector(helper.ApiCollectorArgs{
-		RawDataSubTaskArgs: helper.RawDataSubTaskArgs{
-			Ctx: taskCtx,
-			Params: JiraApiParams{
-				ConnectionId: data.Options.ConnectionId,
-				BoardId:      data.Options.BoardId,
-			},
-			Table: RAW_WORKLOGS_TABLE,
-		},
+	err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
 		Input:         iterator,
 		ApiClient:     data.ApiClient,
 		UrlTemplate:   "api/2/issue/{{ .Input.IssueId }}/worklog",
 		PageSize:      50,
-		Incremental:   since == nil,
+		Incremental:   incremental,
 		GetTotalPages: GetTotalPagesFromResponse,
 		ResponseParser: func(res *http.Response) ([]json.RawMessage, errors.Error) {
 			var data struct {
@@ -103,5 +105,5 @@
 		return err
 	}
 
-	return collector.Execute()
+	return collectorWithState.Execute()
 }