Merge pull request #1810 from merico-dev/unit-test
Unit test for api_collect
diff --git a/go.mod b/go.mod
index 2249b5a..6765f06 100644
--- a/go.mod
+++ b/go.mod
@@ -3,7 +3,7 @@
go 1.17
require (
- github.com/agiledragon/gomonkey v2.0.2+incompatible
+ github.com/agiledragon/gomonkey/v2 v2.7.0
github.com/gin-contrib/cors v1.3.1
github.com/gin-gonic/gin v1.7.4
github.com/go-git/go-git/v5 v5.4.2
diff --git a/go.sum b/go.sum
index b7f076a..5b86e01 100644
--- a/go.sum
+++ b/go.sum
@@ -49,8 +49,8 @@
github.com/ProtonMail/go-crypto v0.0.0-20210428141323-04723f9f07d7/go.mod h1:z4/9nQmJSSwwds7ejkxaJwO37dru3geImFUdJlaLzQo=
github.com/acomagu/bufpipe v1.0.3 h1:fxAGrHZTgQ9w5QqVItgzwj235/uYZYgbXitB+dLupOk=
github.com/acomagu/bufpipe v1.0.3/go.mod h1:mxdxdup/WdsKVreO5GpW4+M/1CE2sMG4jeGJ2sYmHc4=
-github.com/agiledragon/gomonkey v2.0.2+incompatible h1:eXKi9/piiC3cjJD1658mEE2o3NjkJ5vDLgYjCQu0Xlw=
-github.com/agiledragon/gomonkey v2.0.2+incompatible/go.mod h1:2NGfXu1a80LLr2cmWXGBDaHEjb1idR6+FVlX5T3D9hw=
+github.com/agiledragon/gomonkey/v2 v2.7.0 h1:CFT/xdr6xbsIN04Yll4OhKq/vPm0MVD8ykV99jDBesM=
+github.com/agiledragon/gomonkey/v2 v2.7.0/go.mod h1:ap1AmDzcVOAz1YpeJ3TCzIgstoaWLA6jbbgxfB4w2iY=
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239 h1:kFOfPq6dUM1hTo4JG6LR5AXSUEsOjtdm0kw0FtQtMJA=
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
diff --git a/plugins/helper/api_async_client_test.go b/plugins/helper/api_async_client_test.go
index 3d706e7..c29e2fd 100644
--- a/plugins/helper/api_async_client_test.go
+++ b/plugins/helper/api_async_client_test.go
@@ -13,7 +13,7 @@
"testing"
"time"
- "github.com/agiledragon/gomonkey"
+ "github.com/agiledragon/gomonkey/v2"
"github.com/merico-dev/lake/plugins/core"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
diff --git a/plugins/helper/api_collector.go b/plugins/helper/api_collector.go
index 069a5ab..b4e0fb8 100644
--- a/plugins/helper/api_collector.go
+++ b/plugins/helper/api_collector.go
@@ -137,6 +137,9 @@
defer iterator.Close()
// throttle input process speed so it can be canceled, create a channel to represent available slots
slots := int(math.Ceil(collector.args.ApiClient.GetQps())) * 2
+ if slots <= 0 {
+ return fmt.Errorf("RateLimit can't use the 0 Qps")
+ }
slotsChan := make(chan bool, slots)
defer close(slotsChan)
for i := 0; i < slots; i++ {
@@ -148,17 +151,19 @@
var wg sync.WaitGroup
ctx := collector.args.Ctx.GetContext()
+
+ out:
for iterator.HasNext() {
select {
// canceled by user, stop
case <-ctx.Done():
err = ctx.Err()
- break
+ break out
// obtain a slot
case <-slotsChan:
input, err := iterator.Fetch()
if err != nil {
- break
+ break out
}
wg.Add(1)
go func() {
@@ -176,7 +181,7 @@
}
}()
case err = <-errors:
- break
+ break out
}
}
if err == nil {
@@ -425,4 +430,24 @@
return []json.RawMessage{body}, nil
}
+func GetRawMessageArrayFromResponse(res *http.Response) ([]json.RawMessage, error) {
+ rawMessages := []json.RawMessage{}
+
+ if res == nil {
+ return nil, fmt.Errorf("res is nil")
+ }
+ defer res.Body.Close()
+ resBody, err := ioutil.ReadAll(res.Body)
+ if err != nil {
+ return nil, fmt.Errorf("%w %s", err, res.Request.URL.String())
+ }
+
+ err = json.Unmarshal(resBody, &rawMessages)
+ if err != nil {
+ return nil, fmt.Errorf("%w %s %s", err, res.Request.URL.String(), string(resBody))
+ }
+
+ return rawMessages, nil
+}
+
var _ core.SubTask = (*ApiCollector)(nil)
diff --git a/plugins/helper/api_collector_test.go b/plugins/helper/api_collector_test.go
new file mode 100644
index 0000000..283ebf8
--- /dev/null
+++ b/plugins/helper/api_collector_test.go
@@ -0,0 +1,792 @@
+package helper
+
+import (
+ "bytes"
+ "context"
+ "database/sql"
+ "encoding/json"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+ "net/url"
+ "reflect"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "github.com/agiledragon/gomonkey/v2"
+ "github.com/merico-dev/lake/models/common"
+ "github.com/sirupsen/logrus"
+ "github.com/stretchr/testify/assert"
+ "gorm.io/gorm"
+)
+
+// go test -gcflags=all=-l
+
+type TestTable struct {
+ Email string `gorm:"primaryKey;type:varchar(255)"`
+ Name string `gorm:"type:varchar(255)"`
+ common.NoPKModel
+}
+
+var TestTableData *TestTable = &TestTable{
+ Email: "test@test.com",
+ Name: "test",
+}
+
+type TestParam struct {
+ Test string
+}
+
+func (TestTable) TableName() string {
+ return "_tool_test"
+}
+
+var TestError error = fmt.Errorf("Error For Test")
+
+var gt *gomonkey.Patches
+var gc *gomonkey.Patches
+var gd *gomonkey.Patches
+var ga *gomonkey.Patches
+var gs *gomonkey.Patches
+
+var TestUrlBefor string = "test1"
+var TestUrlParam string = "test2"
+var TestUrlAfter string = "test3"
+var TestUrl string = "https://" + TestUrlBefor + TestUrlParam + TestUrlAfter
+
+var TestRawMessage string = "{\"message\":\"TestRawMessage\"}"
+var TestUrlValueKey string = "TestKey"
+var TestUrlValueValue string = "TestValue"
+var TestNoRunHere string = "should not run to this line of code"
+
+var TestDataCount int = 100
+var TestTotalPage int = 100
+var TestDataCountNotFull int = 50
+var TestPage int = 110
+var TestSkip int = 100100
+var TestSize int = 116102
+
+var TestHttpResponse_Suc http.Response = http.Response{
+ Status: "200 OK",
+ StatusCode: 200,
+ Proto: "HTTP/1.0",
+ ProtoMajor: 1,
+ ProtoMinor: 0,
+}
+
+var TestHttpResponse_404 http.Response = http.Response{
+ Status: "404 Not Found",
+ StatusCode: 404,
+ Proto: "HTTP/1.0",
+ ProtoMajor: 1,
+ ProtoMinor: 0,
+}
+
+// Assert http.Response base test data
+func AssertBaseResponse(t *testing.T, A *http.Response, B *http.Response) {
+ assert.Equal(t, A.Status, B.Status)
+ assert.Equal(t, A.StatusCode, B.StatusCode)
+ assert.Equal(t, A.Proto, B.Proto)
+ assert.Equal(t, A.ProtoMajor, B.ProtoMajor)
+ assert.Equal(t, A.ProtoMinor, B.ProtoMinor)
+}
+
+func AddBodyData(res *http.Response, count int) {
+ data := "["
+ for i := 0; i < count; i++ {
+ data += TestRawMessage
+ if i != count-1 {
+ data += ","
+ }
+ }
+ data += "]"
+ res.Body = ioutil.NopCloser(bytes.NewReader([]byte(data)))
+}
+
+func SetUrl(res *http.Response, rawURL string) {
+ u, _ := url.Parse(rawURL)
+ res.Request = &http.Request{
+ URL: u,
+ }
+}
+
+// Mock the DB api
+// Need be released by UnMockDB
+func MockDB(t *testing.T) {
+ gt = gomonkey.ApplyMethod(reflect.TypeOf(&gorm.DB{}), "Table", func(db *gorm.DB, name string, args ...interface{}) *gorm.DB {
+ assert.Equal(t, name, TestTableData.TableName())
+ return db
+ },
+ )
+
+ gc = gomonkey.ApplyMethod(reflect.TypeOf(&gorm.DB{}), "Create", func(db *gorm.DB, value interface{}) *gorm.DB {
+ assert.Equal(t, TestTableData, value.(*TestTable))
+ return db
+ },
+ )
+
+ gd = gomonkey.ApplyMethod(reflect.TypeOf(&gorm.DB{}), "Delete", func(db *gorm.DB, value interface{}, conds ...interface{}) (tx *gorm.DB) {
+ return db
+ },
+ )
+
+ ga = gomonkey.ApplyMethod(reflect.TypeOf(&gorm.DB{}), "AutoMigrate", func(db *gorm.DB, dst ...interface{}) error {
+ return nil
+ },
+ )
+
+ gs = gomonkey.ApplyMethod(reflect.TypeOf(&gorm.DB{}), "ScanRows", func(db *gorm.DB, rows *sql.Rows, dest interface{}) error {
+ dest = TestRawMessage
+ return nil
+ },
+ )
+
+}
+
+// released MockDB
+func UnMockDB() {
+ gt.Reset()
+ gc.Reset()
+ gd.Reset()
+ ga.Reset()
+ gs.Reset()
+}
+
+type TestIterator struct {
+ data TestTable
+ count int
+ hasNextTimes int
+ fetchTimes int
+ closeTimes int
+}
+
+func (it *TestIterator) HasNext() bool {
+ it.hasNextTimes++
+ return it.count > 0
+}
+
+func (it *TestIterator) Fetch() (interface{}, error) {
+ it.fetchTimes++
+ if it.count > 0 {
+ it.count--
+ ret := it.data
+ return &ret, nil
+ }
+ return nil, TestError
+}
+
+func (it *TestIterator) Close() error {
+ it.closeTimes++
+ return nil
+}
+
+func CreateTestApiCollector() (*ApiCollector, error) {
+ db := &gorm.DB{}
+ return NewApiCollector(ApiCollectorArgs{
+ RawDataSubTaskArgs: RawDataSubTaskArgs{
+ Ctx: &DefaultSubTaskContext{
+ defaultExecContext: newDefaultExecContext(GetConfigForTest("../../"), NewDefaultLogger(logrus.New(), "Test"), db, context.Background(), "Test", nil, nil),
+ },
+ Table: TestTable{}.TableName(),
+ Params: &TestParam{
+ Test: TestUrlParam,
+ },
+ },
+ ApiClient: &ApiAsyncClient{qps: 10},
+ PageSize: 100,
+ Incremental: false,
+ UrlTemplate: TestUrlBefor + "{{ .Params.Test }}" + TestUrlAfter,
+ Query: func(reqData *RequestData) (url.Values, error) {
+ u := url.Values{}
+ json, err := json.Marshal(reqData.Input)
+ u.Add("Vjson", string(json))
+ if err != nil {
+ u.Add("Verr", err.Error())
+ } else {
+ u.Add("Verr", "")
+ }
+ return u, nil
+ },
+ Header: func(reqData *RequestData) (http.Header, error) {
+ h := http.Header{}
+ json, err := json.Marshal(reqData.Input)
+ h.Add("Hjson", string(json))
+ if err != nil {
+ h.Add("Herr", err.Error())
+ } else {
+ h.Add("Herr", "")
+ }
+ return h, nil
+ },
+ GetTotalPages: func(res *http.Response, args *ApiCollectorArgs) (int, error) { return TestTotalPage, nil },
+ ResponseParser: GetRawMessageArrayFromResponse,
+ })
+}
+
+func TestGormDB(t *testing.T) {
+ ts := &TestTable{
+ Email: "test@test.com",
+ Name: "test",
+ }
+
+ db := &gorm.DB{}
+ MockDB(t)
+ defer UnMockDB()
+
+ db.Table(ts.TableName()).Create(ts).Delete(ts).AutoMigrate()
+ db.Table(ts.TableName()).Create(ts).Delete(ts).ScanRows(nil, nil)
+}
+
+func TestSaveRawData(t *testing.T) {
+ gt = gomonkey.ApplyMethod(reflect.TypeOf(&gorm.DB{}), "Table", func(db *gorm.DB, name string, args ...interface{}) *gorm.DB {
+ assert.Equal(t, name, "_raw_"+TestTableData.TableName())
+ return db
+ },
+ )
+ defer gt.Reset()
+
+ gc = gomonkey.ApplyMethod(reflect.TypeOf(&gorm.DB{}), "Create", func(db *gorm.DB, value interface{}) *gorm.DB {
+ rd := value.([]*RawData)
+ params, _ := json.Marshal(&TestParam{
+ Test: TestUrlParam,
+ })
+ input, _ := json.Marshal(TestTableData)
+ for _, v := range rd {
+ // check data and url
+ assert.Equal(t, v.Params, string(params))
+ assert.Equal(t, v.Data.String(), TestRawMessage)
+ assert.Equal(t, v.Url, TestUrl)
+ assert.Equal(t, v.Input.String(), string(input))
+ }
+ return db
+ },
+ )
+ defer gc.Reset()
+
+ apiCollector, _ := CreateTestApiCollector()
+
+ resBase := TestHttpResponse_Suc
+ res := &resBase
+
+ // build data and url
+ AddBodyData(res, TestDataCount)
+ SetUrl(res, TestUrl)
+
+ i, err := apiCollector.saveRawData(res, TestTableData)
+ assert.Equal(t, i, TestDataCount)
+ assert.Equal(t, err, nil)
+}
+
+func TestSaveRawData_Fail(t *testing.T) {
+ gt = gomonkey.ApplyMethod(reflect.TypeOf(&gorm.DB{}), "Table", func(db *gorm.DB, name string, args ...interface{}) *gorm.DB {
+ assert.Empty(t, TestNoRunHere)
+ return db
+ },
+ )
+ defer gt.Reset()
+
+ gc = gomonkey.ApplyMethod(reflect.TypeOf(&gorm.DB{}), "Create", func(db *gorm.DB, value interface{}) *gorm.DB {
+ assert.Empty(t, TestNoRunHere)
+ return db
+ },
+ )
+ defer gc.Reset()
+
+ apiCollector, _ := CreateTestApiCollector()
+
+ // build data and url
+ resBase := TestHttpResponse_404
+ res := &resBase
+
+ AddBodyData(res, 0)
+ SetUrl(res, TestUrl)
+
+ //run testing
+ i, err := apiCollector.saveRawData(res, TestTableData)
+ assert.Equal(t, i, 0)
+ assert.Equal(t, err, nil)
+}
+
+func TestHandleResponse(t *testing.T) {
+ apiCollector, _ := CreateTestApiCollector()
+
+ gs := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "saveRawData", func(collector *ApiCollector, res *http.Response, input interface{}) (int, error) {
+ items, err := collector.args.ResponseParser(res)
+ assert.Equal(t, err, nil)
+ // check items data
+ for _, v := range items {
+ jsondata, err := json.Marshal(v)
+ assert.Equal(t, err, nil)
+ assert.Equal(t, string(jsondata), TestRawMessage)
+ }
+ assert.Equal(t, input, TestTableData)
+ AssertBaseResponse(t, res, &TestHttpResponse_Suc)
+ return len(items), nil
+ })
+ defer gs.Reset()
+
+ // build requeset input
+ reqData := new(RequestData)
+ reqData.Input = TestTableData
+ handle := apiCollector.handleResponse(reqData)
+
+ resBase := TestHttpResponse_Suc
+ res := &resBase
+
+ // build data and url
+ AddBodyData(res, TestDataCount)
+ SetUrl(res, TestUrl)
+
+ // run testing
+ err := handle(res, nil)
+ assert.Equal(t, err, nil)
+}
+
+func TestHandleResponse_Fail(t *testing.T) {
+ apiCollector, _ := CreateTestApiCollector()
+
+ gs := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "saveRawData", func(collector *ApiCollector, res *http.Response, input interface{}) (int, error) {
+ items, err := collector.args.ResponseParser(res)
+ assert.Equal(t, err, nil)
+ for _, v := range items {
+ jsondata, err := json.Marshal(v)
+ assert.Equal(t, err, nil)
+ assert.Equal(t, string(jsondata), TestRawMessage)
+ }
+ assert.Equal(t, input, TestTableData)
+ AssertBaseResponse(t, res, &TestHttpResponse_404)
+ return len(items), TestError
+ })
+ defer gs.Reset()
+
+ // build requeset input
+ reqData := new(RequestData)
+ reqData.Input = TestTableData
+ handle := apiCollector.handleResponse(reqData)
+
+ // build data and url
+ resBase := TestHttpResponse_404
+ res := &resBase
+
+ AddBodyData(res, 0)
+ SetUrl(res, TestUrl)
+
+ // run testing
+ err := handle(res, nil)
+ assert.Equal(t, err, TestError)
+}
+
+func TestFetchAsync(t *testing.T) {
+ apiCollector, _ := CreateTestApiCollector()
+
+ gg := gomonkey.ApplyMethod(reflect.TypeOf(&ApiAsyncClient{}), "GetAsync", func(apiAsyncClient *ApiAsyncClient, path string, query url.Values, header http.Header, handler ApiAsyncCallback) error {
+ assert.Equal(t, path, TestUrlBefor+TestUrlParam+TestUrlAfter)
+
+ json, err := json.Marshal(TestTableData)
+ assert.Equal(t, query.Get("Vjson"), string(json))
+ assert.Equal(t, header.Get("Hjson"), string(json))
+ if err != nil {
+ assert.Equal(t, query.Get("Verr"), err.Error())
+ assert.Equal(t, header.Get("Herr"), err.Error())
+ } else {
+ assert.Equal(t, query.Get("Verr"), "")
+ assert.Equal(t, header.Get("Herr"), "")
+ }
+
+ res := TestHttpResponse_Suc
+ handler(&res, TestError)
+ return nil
+ })
+ defer gg.Reset()
+
+ // build request Input
+ reqData := new(RequestData)
+ reqData.Input = TestTableData
+
+ // run testing
+ err := apiCollector.fetchAsync(reqData, func(r *http.Response, err error) error {
+ AssertBaseResponse(t, r, &TestHttpResponse_Suc)
+ assert.Equal(t, err, TestError)
+ return err
+ })
+
+ assert.Equal(t, err, nil)
+}
+
+func TestFetchAsync_Fail(t *testing.T) {
+ apiCollector, _ := CreateTestApiCollector()
+
+ gg := gomonkey.ApplyMethod(reflect.TypeOf(&ApiAsyncClient{}), "GetAsync", func(apiAsyncClient *ApiAsyncClient, path string, query url.Values, header http.Header, handler ApiAsyncCallback) error {
+ assert.Equal(t, path, TestUrlBefor+TestUrlParam+TestUrlAfter)
+
+ json, err := json.Marshal(TestTableData)
+ assert.Equal(t, query.Get("Vjson"), string(json))
+ assert.Equal(t, header.Get("Hjson"), string(json))
+ if err != nil {
+ assert.Equal(t, query.Get("Verr"), err.Error())
+ assert.Equal(t, header.Get("Herr"), err.Error())
+ } else {
+ assert.Equal(t, query.Get("Verr"), "")
+ assert.Equal(t, header.Get("Herr"), "")
+ }
+
+ res := TestHttpResponse_404
+ handler(&res, TestError)
+ return TestError
+ })
+ defer gg.Reset()
+
+ // build request Input
+ reqData := new(RequestData)
+ reqData.Input = TestTableData
+
+ // run testing
+ err := apiCollector.fetchAsync(reqData, func(r *http.Response, err error) error {
+ AssertBaseResponse(t, r, &TestHttpResponse_404)
+ assert.Equal(t, err, TestError)
+ return err
+ })
+
+ assert.Equal(t, err, TestError)
+}
+
+func TestHandleResponseWithPages(t *testing.T) {
+ apiCollector, _ := CreateTestApiCollector()
+ pages := make([]bool, TestTotalPage+1)
+ for i := 1; i <= TestTotalPage; i++ {
+ pages[i] = false
+ }
+
+ gf := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "fetchAsync", func(collector *ApiCollector, reqData *RequestData, handler ApiAsyncCallback) error {
+ page := reqData.Pager.Page
+ pages[page] = true
+ return nil
+ })
+ defer gf.Reset()
+
+ gs := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "saveRawData", func(collector *ApiCollector, res *http.Response, input interface{}) (int, error) {
+ items, err := collector.args.ResponseParser(res)
+ assert.Equal(t, err, nil)
+ for _, v := range items {
+ jsondata, err := json.Marshal(v)
+ assert.Equal(t, err, nil)
+ assert.Equal(t, string(jsondata), TestRawMessage)
+ }
+ assert.Equal(t, input, TestTableData)
+ AssertBaseResponse(t, res, &TestHttpResponse_Suc)
+ return len(items), nil
+ })
+ defer gs.Reset()
+
+ // build request Input
+ reqData := new(RequestData)
+ reqData.Input = TestTableData
+ handle := apiCollector.handleResponseWithPages(reqData)
+
+ // build data and url
+ resBase := TestHttpResponse_Suc
+ res := &resBase
+
+ AddBodyData(res, TestDataCount)
+ SetUrl(res, TestUrl)
+
+ // run testing
+ err := handle(res, nil)
+ assert.Equal(t, err, nil)
+ for i := 2; i <= TestTotalPage; i++ {
+ assert.True(t, pages[i], i)
+ }
+}
+
+func TestHandleResponseWithPages_Fail(t *testing.T) {
+ apiCollector, _ := CreateTestApiCollector()
+
+ gf := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "fetchAsync", func(collector *ApiCollector, reqData *RequestData, handler ApiAsyncCallback) error {
+ return TestError
+ })
+ defer gf.Reset()
+
+ gs := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "saveRawData", func(collector *ApiCollector, res *http.Response, input interface{}) (int, error) {
+ items, err := collector.args.ResponseParser(res)
+ assert.Equal(t, err, nil)
+ for _, v := range items {
+ jsondata, err := json.Marshal(v)
+ assert.Equal(t, err, nil)
+ assert.Equal(t, string(jsondata), TestRawMessage)
+ }
+ assert.Equal(t, input, TestTableData)
+ AssertBaseResponse(t, res, &TestHttpResponse_404)
+ return len(items), TestError
+ })
+ defer gs.Reset()
+
+ // build request Input
+ reqData := new(RequestData)
+ reqData.Input = TestTableData
+ handle := apiCollector.handleResponseWithPages(reqData)
+
+ // build data and url
+ resBase := TestHttpResponse_404
+ res := &resBase
+
+ AddBodyData(res, 0)
+ SetUrl(res, TestUrl)
+
+ // run testing
+ err := handle(res, nil)
+ assert.Equal(t, err, TestError)
+}
+
+func TestStepFetch(t *testing.T) {
+ apiCollector, _ := CreateTestApiCollector()
+
+ init := false
+ noFullTimes := 0
+
+ gs := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "saveRawData", func(collector *ApiCollector, res *http.Response, input interface{}) (int, error) {
+ items, err := collector.args.ResponseParser(res)
+ assert.Equal(t, err, nil)
+ for _, v := range items {
+ jsondata, err := json.Marshal(v)
+ assert.Equal(t, err, nil)
+ assert.Equal(t, string(jsondata), TestRawMessage)
+ }
+ // full page
+ assert.Equal(t, input, TestTableData)
+ AssertBaseResponse(t, res, &TestHttpResponse_Suc)
+ return len(items), nil
+ })
+ defer gs.Reset()
+
+ gf := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "fetchAsync", func(collector *ApiCollector, reqData *RequestData, handler ApiAsyncCallback) error {
+ resBase := TestHttpResponse_Suc
+ res := &resBase
+ SetUrl(res, TestUrl)
+
+ // full page for continue
+ if reqData.Pager.Page == TestPage {
+ init = true
+ assert.Equal(t, reqData.Pager.Skip, TestSkip)
+ assert.Equal(t, reqData.Pager.Size, TestSize)
+ AddBodyData(res, TestDataCount)
+ } else {
+ // not full page for stop
+ AddBodyData(res, TestDataCountNotFull)
+ noFullTimes++
+ }
+
+ go handler(res, nil)
+ return nil
+ })
+ defer gf.Reset()
+
+ // build request Input
+ reqData := new(RequestData)
+ reqData.Input = TestTableData
+ reqData.Pager = &Pager{
+ Page: TestPage,
+ Skip: TestSkip,
+ Size: TestSize,
+ }
+
+ // cancel can only be called when error occurs, because we are doomed anyway.
+ ctx, cancel := context.WithCancel(apiCollector.args.Ctx.GetContext())
+
+ // run testing
+ err := apiCollector.stepFetch(ctx, cancel, *reqData)
+
+ assert.Equal(t, noFullTimes, 1)
+ assert.Equal(t, init, true)
+ assert.Equal(t, err, nil)
+}
+
+func TestStepFetch_Fail(t *testing.T) {
+ apiCollector, _ := CreateTestApiCollector()
+
+ init := false
+ noFullTimes := 0
+
+ gs := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "saveRawData", func(collector *ApiCollector, res *http.Response, input interface{}) (int, error) {
+ items, err := collector.args.ResponseParser(res)
+ assert.Equal(t, err, nil)
+ for _, v := range items {
+ jsondata, err := json.Marshal(v)
+ assert.Equal(t, err, nil)
+ assert.Equal(t, string(jsondata), TestRawMessage)
+ }
+ // full page
+ assert.Equal(t, input, TestTableData)
+ AssertBaseResponse(t, res, &TestHttpResponse_404)
+ return len(items), nil
+ })
+ defer gs.Reset()
+
+ gf := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "fetchAsync", func(collector *ApiCollector, reqData *RequestData, handler ApiAsyncCallback) error {
+ resBase := TestHttpResponse_404
+ res := &resBase
+ SetUrl(res, TestUrl)
+ // full page for continue
+ if reqData.Pager.Page == TestPage {
+ init = true
+ assert.Equal(t, reqData.Pager.Skip, TestSkip)
+ assert.Equal(t, reqData.Pager.Size, TestSize)
+ AddBodyData(res, TestDataCount)
+ go handler(res, nil)
+ } else {
+ // not full page for stop
+ AddBodyData(res, TestDataCountNotFull)
+ noFullTimes++
+ return TestError
+ }
+ return nil
+ })
+ defer gf.Reset()
+
+ // build request Input
+ reqData := new(RequestData)
+ reqData.Input = TestTableData
+ reqData.Pager = &Pager{
+ Page: TestPage,
+ Skip: TestSkip,
+ Size: TestSize,
+ }
+
+ // cancel can only be called when error occurs, because we are doomed anyway.
+ ctx, cancel := context.WithCancel(apiCollector.args.Ctx.GetContext())
+
+ err := apiCollector.stepFetch(ctx, cancel, *reqData)
+
+ assert.Equal(t, noFullTimes, 1)
+ assert.Equal(t, init, true)
+ assert.Equal(t, err, TestError)
+}
+
+func TestExecWithOutPageSize(t *testing.T) {
+ apiCollector, _ := CreateTestApiCollector()
+ apiCollector.args.PageSize = 0
+
+ gf := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "fetchAsync", func(collector *ApiCollector, reqData *RequestData, handler ApiAsyncCallback) error {
+ assert.Equal(t, reqData.Input, TestTableData)
+ return nil
+ })
+ defer gf.Reset()
+
+ gs := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "stepFetch", func(collector *ApiCollector, ctx context.Context, cancel func(), reqData RequestData) error {
+ assert.Empty(t, TestNoRunHere)
+ return TestError
+ })
+ defer gs.Reset()
+
+ // run testing
+ err := apiCollector.exec(TestTableData)
+ assert.Equal(t, err, nil)
+}
+
+func TestExecWithGetTotalPages(t *testing.T) {
+ apiCollector, _ := CreateTestApiCollector()
+
+ gf := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "fetchAsync", func(collector *ApiCollector, reqData *RequestData, handler ApiAsyncCallback) error {
+ assert.Equal(t, reqData.Input, TestTableData)
+ return nil
+ })
+ defer gf.Reset()
+
+ gs := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "stepFetch", func(collector *ApiCollector, ctx context.Context, cancel func(), reqData RequestData) error {
+ assert.Empty(t, TestNoRunHere)
+ return TestError
+ })
+ defer gs.Reset()
+
+ // run testing
+ err := apiCollector.exec(TestTableData)
+ assert.Equal(t, err, nil)
+}
+
+func TestExecWithOutGetTotalPages(t *testing.T) {
+ apiCollector, _ := CreateTestApiCollector()
+ apiCollector.args.GetTotalPages = nil
+ apiCollector.args.Concurrency = TestTotalPage
+
+ pages := make([]bool, TestTotalPage+1)
+ for i := 1; i <= TestTotalPage; i++ {
+ pages[i] = false
+ }
+
+ gf := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "fetchAsync", func(collector *ApiCollector, reqData *RequestData, handler ApiAsyncCallback) error {
+ assert.Equal(t, reqData.Input, TestTableData)
+ return nil
+ })
+ defer gf.Reset()
+
+ gs := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "stepFetch", func(collector *ApiCollector, ctx context.Context, cancel func(), reqData RequestData) error {
+ assert.Equal(t, reqData.Input, TestTableData)
+ page := reqData.Pager.Page
+ pages[page] = true
+ assert.Equal(t, reqData.Pager.Size, apiCollector.args.PageSize)
+ assert.Equal(t, reqData.Pager.Skip, apiCollector.args.PageSize*(page-1))
+ return nil
+ })
+ defer gs.Reset()
+
+ // run testing
+ err := apiCollector.exec(TestTableData)
+ assert.Equal(t, err, nil)
+
+ for i := 2; i <= TestTotalPage; i++ {
+ assert.True(t, pages[i], i)
+ }
+}
+
+func TestExecute(t *testing.T) {
+ MockDB(t)
+ defer UnMockDB()
+ apiCollector, _ := CreateTestApiCollector()
+
+ apiCollector.args.Input = &TestIterator{
+ data: *TestTableData,
+ count: TestDataCount,
+ hasNextTimes: 0,
+ fetchTimes: 0,
+ closeTimes: 0,
+ }
+
+ gt.Reset()
+ gt = gomonkey.ApplyMethod(reflect.TypeOf(&gorm.DB{}), "Table", func(db *gorm.DB, name string, args ...interface{}) *gorm.DB {
+ assert.Equal(t, name, "_raw_"+TestTableData.TableName())
+ return db
+ },
+ )
+ defer gt.Reset()
+
+ NeedWait := int64(0)
+ execTimes := 0
+
+ ge := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "exec", func(collector *ApiCollector, input interface{}) error {
+ atomic.AddInt64(&NeedWait, 1)
+ execTimes++
+ assert.Equal(t, input.(*TestTable).Email, TestTableData.Email)
+ assert.Equal(t, input.(*TestTable).Name, TestTableData.Name)
+ atomic.AddInt64(&NeedWait, -1)
+ return nil
+ })
+ defer ge.Reset()
+
+ gw := gomonkey.ApplyMethod(reflect.TypeOf(&ApiAsyncClient{}), "WaitAsync", func(apiClient *ApiAsyncClient) error {
+ for atomic.LoadInt64(&NeedWait) > 0 {
+ time.Sleep(time.Millisecond)
+ }
+ return nil
+ })
+ defer gw.Reset()
+
+ // run testing
+ err := apiCollector.Execute()
+ assert.Equal(t, err, nil)
+ assert.Equal(t, execTimes, TestDataCount)
+
+ input := apiCollector.args.Input.(*TestIterator)
+ assert.Equal(t, input.fetchTimes, TestDataCount)
+ assert.Equal(t, input.hasNextTimes >= input.fetchTimes, true)
+ assert.Equal(t, input.closeTimes > 0, true)
+}