blob: 8b8777b0407ec5bd8b867612eb657b73eeb73558 [file] [log] [blame]
package helper
import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"time"
"github.com/merico-dev/lake/plugins/core"
"github.com/merico-dev/lake/utils"
)
type ApiAsyncCallback func(*http.Response, error) error
var HttpMinStatusRetryCode = http.StatusBadRequest
// ApiAsyncClient is built on top of ApiClient, to provide a asynchronous semantic
// You may submit multiple requests at once by calling `GetAsync`, and those requests
// will be performed in parallel with rate-limit support
type ApiAsyncClient struct {
*ApiClient
maxRetry int
scheduler *WorkerScheduler
qps float64
}
func CreateAsyncApiClient(
taskCtx core.TaskContext,
apiClient *ApiClient,
rateLimiter *ApiRateLimitCalculator,
) (*ApiAsyncClient, error) {
// load retry/timeout from configuration
retry, err := utils.StrToIntOr(taskCtx.GetConfig("API_RETRY"), 3)
if err != nil {
return nil, fmt.Errorf("failed to parse API_RETRY: %w", err)
}
timeout, err := utils.StrToDurationOr(taskCtx.GetConfig("API_TIMEOUT"), 10*time.Second)
if err != nil {
return nil, fmt.Errorf("failed to parse API_TIMEOUT: %w", err)
}
apiClient.SetTimeout(timeout)
apiClient.SetLogger(taskCtx.GetLogger())
globalRateLimitPerHour, err := utils.StrToIntOr(taskCtx.GetConfig("API_REQUESTS_PER_HOUR"), 18000)
if err != nil {
return nil, fmt.Errorf("failed to parse API_REQUESTS_PER_HOUR: %w", err)
}
if rateLimiter == nil {
rateLimiter = &ApiRateLimitCalculator{}
}
rateLimiter.GlobalRateLimitPerHour = globalRateLimitPerHour
rateLimiter.MaxRetry = retry
// ok, calculate api rate limit based on response (normally from headers)
requests, duration, err := rateLimiter.Calculate(apiClient)
if err != nil {
return nil, fmt.Errorf("failed to calculate rateLimit for api: %w", err)
}
// it is hard to tell how many workers would be sufficient, it depends on how slow the server responds.
// we need more workers when server is responding slowly, because requests are sent in a fixed pace.
// and because workers are relatively cheap, lets assume response takes 5 seconds
const RESPONSE_TIME = 5 * time.Second
// in order for scheduler to hold requests of 3 seconds, we need:
d := duration / RESPONSE_TIME
numOfWorkers := requests / int(d)
taskCtx.GetLogger().Info(
"scheduler for api %s worker: %d, request: %d, duration: %v",
apiClient.GetEndpoint(),
numOfWorkers,
requests,
duration,
)
scheduler, err := NewWorkerScheduler(numOfWorkers, requests, duration, taskCtx.GetContext(), retry)
if err != nil {
return nil, fmt.Errorf("failed to create scheduler: %w", err)
}
qps := float64(requests) / duration.Seconds()
// finally, wrap around api client with async sematic
return &ApiAsyncClient{
apiClient,
retry,
scheduler,
qps,
}, nil
}
func (apiClient *ApiAsyncClient) DoAsync(
method string,
path string,
query url.Values,
body interface{},
header http.Header,
handler ApiAsyncCallback,
retry int,
) error {
var subFunc func() error
subFunc = func() error {
var err error
var res *http.Response
var body []byte
res, err = apiClient.Do(method, path, query, body, header)
if err == nil {
body, err = ioutil.ReadAll(res.Body)
res.Body.Close()
res.Body = io.NopCloser(bytes.NewBuffer(body))
}
// check
needRetry := false
if err != nil {
needRetry = true
} else if res.StatusCode >= HttpMinStatusRetryCode {
needRetry = true
err = fmt.Errorf("http code error[%d]:[%s]", res.StatusCode, body)
}
// if it need retry, check and try to retry
if needRetry {
// check weather we still have retry times and not error from handler and canceled error
if retry < apiClient.maxRetry && err != context.Canceled {
apiClient.logError("retry #%d for %s", retry, err.Error())
retry++
return apiClient.scheduler.Submit(subFunc, apiClient.scheduler.subPool)
}
return err
}
if err == nil {
if res.StatusCode >= 400 {
err = fmt.Errorf("http code error[%d]:[%s]", res.StatusCode, body)
}
}
// it is important to let handler have a chance to handle error, or it can hang indefinitely
// when error occurs
return handler(res, err)
}
return apiClient.scheduler.Submit(subFunc)
}
// Enqueue an api get request, the request may be sent sometime in future in parallel with other api requests
func (apiClient *ApiAsyncClient) GetAsync(
path string,
query url.Values,
header http.Header,
handler ApiAsyncCallback,
) error {
return apiClient.DoAsync(http.MethodGet, path, query, nil, header, handler, 0)
}
// Wait until all async requests were done
func (apiClient *ApiAsyncClient) WaitAsync() error {
return apiClient.scheduler.WaitUntilFinish()
}
func (apiClient *ApiAsyncClient) GetQps() float64 {
return apiClient.qps
}
type RateLimitedApiClient interface {
GetAsync(path string, query url.Values, header http.Header, handler ApiAsyncCallback) error
WaitAsync() error
GetQps() float64
}
var _ RateLimitedApiClient = (*ApiAsyncClient)(nil)