feat: jira supports timefilter by updated_at (#4408)

* feat: jira supports timefilter by updated_date

* fix: createdDateAfter follow previous logic

* fix: remove redundant condition

* refactor: rename UpdatedDateAfter to TimeAfter
diff --git a/models/collector_state.go b/models/collector_state.go
index e0230a4..ee693e1 100644
--- a/models/collector_state.go
+++ b/models/collector_state.go
@@ -27,6 +27,7 @@
 	RawDataParams      string    `gorm:"primaryKey;column:raw_data_params;type:varchar(255);index" json:"raw_data_params"`
 	RawDataTable       string    `gorm:"primaryKey;column:raw_data_table;type:varchar(255)" json:"raw_data_table"`
 	CreatedDateAfter   *time.Time
+	TimeAfter          *time.Time
 	LatestSuccessStart *time.Time
 }
 
diff --git a/models/migrationscripts/20230213_add_updated_date_after_to_collector_state.go b/models/migrationscripts/20230213_add_updated_date_after_to_collector_state.go
new file mode 100644
index 0000000..d487334
--- /dev/null
+++ b/models/migrationscripts/20230213_add_updated_date_after_to_collector_state.go
@@ -0,0 +1,48 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package migrationscripts
+
+import (
+	"time"
+
+	"github.com/apache/incubator-devlake/errors"
+	"github.com/apache/incubator-devlake/helpers/migrationhelper"
+	"github.com/apache/incubator-devlake/plugins/core"
+)
+
+type collectorLatestState20230213 struct {
+	TimeAfter *time.Time
+}
+
+func (collectorLatestState20230213) TableName() string {
+	return "_devlake_collector_latest_state"
+}
+
+type addTimeAfterToCollectorMeta20230213 struct{}
+
+func (script *addTimeAfterToCollectorMeta20230213) Up(basicRes core.BasicRes) errors.Error {
+	return migrationhelper.AutoMigrateTables(basicRes, &collectorLatestState20230213{})
+}
+
+func (*addTimeAfterToCollectorMeta20230213) Version() uint64 {
+	return 20230213200039
+}
+
+func (*addTimeAfterToCollectorMeta20230213) Name() string {
+	return "add time_after to _devlake_collector_latest_state"
+}
diff --git a/models/migrationscripts/register.go b/models/migrationscripts/register.go
index e8a1b13..ec982a8 100644
--- a/models/migrationscripts/register.go
+++ b/models/migrationscripts/register.go
@@ -69,5 +69,6 @@
 		new(encryptTask221221),
 		new(renameProjectMetrics),
 		new(addOriginalTypeToIssue221230),
+		new(addTimeAfterToCollectorMeta20230213),
 	}
 }
diff --git a/plugins/core/plugin_blueprint.go b/plugins/core/plugin_blueprint.go
index 11b2e69..d05d092 100644
--- a/plugins/core/plugin_blueprint.go
+++ b/plugins/core/plugin_blueprint.go
@@ -19,8 +19,9 @@
 
 import (
 	"encoding/json"
-	"github.com/apache/incubator-devlake/errors"
 	"time"
+
+	"github.com/apache/incubator-devlake/errors"
 )
 
 // PipelineTask represents a smallest unit of execution inside a PipelinePlan
@@ -172,7 +173,9 @@
 }
 
 type BlueprintSyncPolicy struct {
-	Version          string     `json:"version" validate:"required,semver,oneof=1.0.0"`
-	SkipOnFail       bool       `json:"skipOnFail"`
+	Version    string `json:"version" validate:"required,semver,oneof=1.0.0"`
+	SkipOnFail bool   `json:"skipOnFail"`
+	// Deprecating
 	CreatedDateAfter *time.Time `json:"createdDateAfter"`
+	TimeAfter        *time.Time `json:"timeAfter"`
 }
diff --git a/plugins/helper/api_collector_with_state.go b/plugins/helper/api_collector_with_state.go
index c8a7a64..5c25bbd 100644
--- a/plugins/helper/api_collector_with_state.go
+++ b/plugins/helper/api_collector_with_state.go
@@ -30,13 +30,15 @@
 	RawDataSubTaskArgs
 	*ApiCollector
 	*GraphqlCollector
-	LatestState      models.CollectorLatestState
+	LatestState models.CollectorLatestState
+	// Deprecating
 	CreatedDateAfter *time.Time
+	TimeAfter        *time.Time
 	ExecuteStart     time.Time
 }
 
-// NewApiCollectorWithState create a new ApiCollectorStateManager
-func NewApiCollectorWithState(args RawDataSubTaskArgs, createdDateAfter *time.Time) (*ApiCollectorStateManager, errors.Error) {
+// NewApiCollectorWithStateEx create a new ApiCollectorStateManager
+func NewApiCollectorWithStateEx(args RawDataSubTaskArgs, createdDateAfter *time.Time, timeAfter *time.Time) (*ApiCollectorStateManager, errors.Error) {
 	db := args.Ctx.GetDal()
 
 	rawDataSubTask, err := NewRawDataSubTask(args)
@@ -59,18 +61,28 @@
 		RawDataSubTaskArgs: args,
 		LatestState:        latestState,
 		CreatedDateAfter:   createdDateAfter,
+		TimeAfter:          timeAfter,
 		ExecuteStart:       time.Now(),
 	}, nil
 }
 
-// IsIncremental return if the old data can support collect incrementally.
-// only when latest collection is success &&
-// (m.LatestState.CreatedDateAfter == nil means all data have been collected ||
-// CreatedDateAfter at this time exists and no before than in the LatestState)
-// if CreatedDateAfter at this time not exists, collect incrementally only when "m.LatestState.CreatedDateAfter == nil"
-func (m ApiCollectorStateManager) IsIncremental() bool {
-	return m.LatestState.LatestSuccessStart != nil &&
-		(m.LatestState.CreatedDateAfter == nil || m.CreatedDateAfter != nil && !m.CreatedDateAfter.Before(*m.LatestState.CreatedDateAfter))
+// NewApiCollectorWithState create a new ApiCollectorStateManager
+func NewApiCollectorWithState(args RawDataSubTaskArgs, createdDateAfter *time.Time) (*ApiCollectorStateManager, errors.Error) {
+	return NewApiCollectorWithStateEx(args, createdDateAfter, nil)
+}
+
+// IsIncremental indicates if the collector should operate in incremental mode
+func (m *ApiCollectorStateManager) IsIncremental() bool {
+	// the initial collection
+	if m.LatestState.LatestSuccessStart == nil {
+		return false
+	}
+	// prioritize TimeAfter parameter: collector should filter data by `updated_date`
+	if m.TimeAfter != nil {
+		return m.LatestState.TimeAfter == nil || !m.TimeAfter.Before(*m.LatestState.TimeAfter)
+	}
+	// fallback to CreatedDateAfter: collector should filter data by `created_date`
+	return m.LatestState.CreatedDateAfter == nil || m.CreatedDateAfter != nil && !m.CreatedDateAfter.Before(*m.LatestState.CreatedDateAfter)
 }
 
 // InitCollector init the embedded collector
@@ -94,11 +106,7 @@
 		return err
 	}
 
-	db := m.Ctx.GetDal()
-	m.LatestState.LatestSuccessStart = &m.ExecuteStart
-	m.LatestState.CreatedDateAfter = m.CreatedDateAfter
-	err = db.CreateOrUpdate(&m.LatestState)
-	return err
+	return m.updateState()
 }
 
 // ExecuteGraphQL the embedded collector and record execute state
@@ -108,9 +116,13 @@
 		return err
 	}
 
+	return m.updateState()
+}
+
+func (m ApiCollectorStateManager) updateState() errors.Error {
 	db := m.Ctx.GetDal()
 	m.LatestState.LatestSuccessStart = &m.ExecuteStart
 	m.LatestState.CreatedDateAfter = m.CreatedDateAfter
-	err = db.CreateOrUpdate(&m.LatestState)
-	return err
+	m.LatestState.TimeAfter = m.TimeAfter
+	return db.CreateOrUpdate(&m.LatestState)
 }
diff --git a/plugins/jira/impl/impl.go b/plugins/jira/impl/impl.go
index 4ad7500..6b56ee3 100644
--- a/plugins/jira/impl/impl.go
+++ b/plugins/jira/impl/impl.go
@@ -219,14 +219,6 @@
 		}
 	}
 
-	var createdDateAfter time.Time
-	if op.CreatedDateAfter != "" {
-		createdDateAfter, err = errors.Convert01(time.Parse(time.RFC3339, op.CreatedDateAfter))
-		if err != nil {
-			return nil, errors.BadInput.Wrap(err, "invalid value for `createdDateAfter`")
-		}
-	}
-
 	info, code, err := tasks.GetJiraServerInfo(jiraApiClient)
 	if err != nil || code != http.StatusOK || info == nil {
 		return nil, errors.HttpStatus(code).Wrap(err, "fail to get Jira server info")
@@ -236,11 +228,24 @@
 		ApiClient:      jiraApiClient,
 		JiraServerInfo: *info,
 	}
-	if !createdDateAfter.IsZero() {
+	if op.CreatedDateAfter != "" {
+		var createdDateAfter time.Time
+		createdDateAfter, err = errors.Convert01(time.Parse(time.RFC3339, op.CreatedDateAfter))
+		if err != nil {
+			return nil, errors.BadInput.Wrap(err, "invalid value for `createdDateAfter`")
+		}
 		taskData.CreatedDateAfter = &createdDateAfter
 		logger.Debug("collect data created from %s", createdDateAfter)
 	}
-
+	if op.TimeAfter != "" {
+		var timeAfter time.Time
+		timeAfter, err = errors.Convert01(time.Parse(time.RFC3339, op.TimeAfter))
+		if err != nil {
+			return nil, errors.BadInput.Wrap(err, "invalid value for `timeAfter`")
+		}
+		taskData.TimeAfter = &timeAfter
+		logger.Debug("collect data created from %s", timeAfter)
+	}
 	return taskData, nil
 }
 
diff --git a/plugins/jira/tasks/issue_collector.go b/plugins/jira/tasks/issue_collector.go
index 332ffa1..edec1b8 100644
--- a/plugins/jira/tasks/issue_collector.go
+++ b/plugins/jira/tasks/issue_collector.go
@@ -44,7 +44,7 @@
 func CollectIssues(taskCtx core.SubTaskContext) errors.Error {
 	data := taskCtx.GetData().(*JiraTaskData)
 
-	collectorWithState, err := helper.NewApiCollectorWithState(helper.RawDataSubTaskArgs{
+	collectorWithState, err := helper.NewApiCollectorWithStateEx(helper.RawDataSubTaskArgs{
 		Ctx: taskCtx,
 		/*
 			This struct will be JSONEncoded and stored into database along with raw data itself, to identity minimal
@@ -58,7 +58,7 @@
 			Table store raw data
 		*/
 		Table: RAW_ISSUE_TABLE,
-	}, data.CreatedDateAfter)
+	}, data.CreatedDateAfter, data.TimeAfter)
 	if err != nil {
 		return err
 	}
@@ -66,13 +66,16 @@
 	// build jql
 	// IMPORTANT: we have to keep paginated data in a consistence order to avoid data-missing, if we sort issues by
 	//  `updated`, issue will be jumping between pages if it got updated during the collection process
-	createdDateAfter := data.CreatedDateAfter
 	jql := "created is not null ORDER BY created ASC"
-	if createdDateAfter != nil {
-		// prepend a time range criteria if `since` was specified, either by user or from database
-		jql = fmt.Sprintf("created >= '%v' AND %v", createdDateAfter.Format("2006/01/02 15:04"), jql)
+
+	// timer filter
+	if data.TimeAfter != nil {
+		jql = fmt.Sprintf("updated >= '%v' AND %v", data.TimeAfter.Format("2006/01/02 15:04"), jql)
+	} else if data.CreatedDateAfter != nil {
+		jql = fmt.Sprintf("created >= '%v' AND %v", data.CreatedDateAfter.Format("2006/01/02 15:04"), jql)
 	}
 
+	// diff sync
 	incremental := collectorWithState.IsIncremental()
 	if incremental {
 		jql = fmt.Sprintf("updated >= '%v' AND %v", collectorWithState.LatestState.LatestSuccessStart.Format("2006/01/02 15:04"), jql)
diff --git a/plugins/jira/tasks/task_data.go b/plugins/jira/tasks/task_data.go
index 866accb..5d9ec4c 100644
--- a/plugins/jira/tasks/task_data.go
+++ b/plugins/jira/tasks/task_data.go
@@ -20,9 +20,10 @@
 import (
 	"encoding/json"
 	"fmt"
-	"github.com/apache/incubator-devlake/errors"
 	"time"
 
+	"github.com/apache/incubator-devlake/errors"
+
 	"github.com/apache/incubator-devlake/plugins/helper"
 	"github.com/apache/incubator-devlake/plugins/jira/models"
 )
@@ -95,6 +96,7 @@
 	ConnectionId         uint64 `json:"connectionId"`
 	BoardId              uint64 `json:"boardId"`
 	CreatedDateAfter     string
+	TimeAfter            string
 	TransformationRules  *JiraTransformationRule `json:"transformationRules"`
 	ScopeId              string
 	TransformationRuleId uint64
@@ -104,6 +106,7 @@
 	Options          *JiraOptions
 	ApiClient        *helper.ApiAsyncClient
 	CreatedDateAfter *time.Time
+	TimeAfter        *time.Time
 	JiraServerInfo   models.JiraServerInfo
 }