// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements.  See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License.  You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package apisix

import (
	"bytes"
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"net"
	"net/http"
	"net/url"
	"strings"
	"sync/atomic"
	"time"

	"go.uber.org/multierr"
	"go.uber.org/zap"
	"k8s.io/apimachinery/pkg/util/wait"

	"github.com/apache/apisix-ingress-controller/pkg/apisix/cache"
	"github.com/apache/apisix-ingress-controller/pkg/log"
	"github.com/apache/apisix-ingress-controller/pkg/metrics"
	"github.com/apache/apisix-ingress-controller/pkg/types"
)

const (
	_defaultTimeout      = 5 * time.Second
	_defaultSyncInterval = 6 * time.Hour

	_cacheSyncing = iota
	_cacheSynced
)

var (
	// ErrClusterNotExist means a cluster doesn't exist.
	ErrClusterNotExist = errors.New("client not exist")
	// ErrDuplicatedCluster means the cluster adding request was
	// rejected since the cluster was already created.
	ErrDuplicatedCluster = errors.New("duplicated cluster")
	// ErrFunctionDisabled means the APISIX function is disabled
	ErrFunctionDisabled = errors.New("function disabled")

	_errReadOnClosedResBody = errors.New("http: read on closed response body")

	// Default shared transport for apisix client
	_defaultTransport = &http.Transport{
		Proxy: http.ProxyFromEnvironment,
		Dial: (&net.Dialer{
			Timeout: 3 * time.Second,
		}).Dial,
		DialContext: (&net.Dialer{
			Timeout: 3 * time.Second,
		}).DialContext,
		ResponseHeaderTimeout: 30 * time.Second,
		ExpectContinueTimeout: 1 * time.Second,
	}
)

// ClusterOptions contains parameters to customize APISIX client.
type ClusterOptions struct {
	AdminAPIVersion string
	Name            string
	AdminKey        string
	BaseURL         string
	Timeout         time.Duration
	// SyncInterval is the interval to sync schema.
	SyncInterval     types.TimeDuration
	MetricsCollector metrics.Collector
}

type cluster struct {
	adminVersion            string
	name                    string
	baseURL                 string
	baseURLHost             string
	adminKey                string
	cli                     *http.Client
	cacheState              int32
	cache                   cache.Cache
	cacheSynced             chan struct{}
	cacheSyncErr            error
	route                   Route
	upstream                Upstream
	ssl                     SSL
	streamRoute             StreamRoute
	globalRules             GlobalRule
	consumer                Consumer
	plugin                  Plugin
	schema                  Schema
	pluginConfig            PluginConfig
	metricsCollector        metrics.Collector
	upstreamServiceRelation UpstreamServiceRelation
	pluginMetadata          PluginMetadata
}

func newCluster(ctx context.Context, o *ClusterOptions) (Cluster, error) {
	if o.BaseURL == "" {
		return nil, errors.New("empty base url")
	}
	if o.Timeout == time.Duration(0) {
		o.Timeout = _defaultTimeout
	}
	if o.SyncInterval.Duration == time.Duration(0) {
		o.SyncInterval = types.TimeDuration{Duration: _defaultSyncInterval}
	}
	o.BaseURL = strings.TrimSuffix(o.BaseURL, "/")

	u, err := url.Parse(o.BaseURL)
	if err != nil {
		return nil, err
	}

	// if the version is not v3, then fallback to v2
	adminVersion := o.AdminAPIVersion
	if adminVersion != "v3" {
		adminVersion = "v2"
	}
	c := &cluster{
		adminVersion: adminVersion,
		name:         o.Name,
		baseURL:      o.BaseURL,
		baseURLHost:  u.Host,
		adminKey:     o.AdminKey,
		cli: &http.Client{
			Timeout:   o.Timeout,
			Transport: _defaultTransport,
		},
		cacheState:       _cacheSyncing, // default state
		cacheSynced:      make(chan struct{}),
		metricsCollector: o.MetricsCollector,
	}
	c.route = newRouteClient(c)
	c.upstream = newUpstreamClient(c)
	c.ssl = newSSLClient(c)
	c.streamRoute = newStreamRouteClient(c)
	c.globalRules = newGlobalRuleClient(c)
	c.consumer = newConsumerClient(c)
	c.plugin = newPluginClient(c)
	c.schema = newSchemaClient(c)
	c.pluginConfig = newPluginConfigClient(c)
	c.upstreamServiceRelation = newUpstreamServiceRelation(c)
	c.pluginMetadata = newPluginMetadataClient(c)

	c.cache, err = cache.NewMemDBCache()
	if err != nil {
		return nil, err
	}

	go c.syncCache(ctx)
	go c.syncSchema(ctx, o.SyncInterval.Duration)

	return c, nil
}

func (c *cluster) syncCache(ctx context.Context) {
	log.Infow("syncing cache", zap.String("cluster", c.name))
	now := time.Now()
	defer func() {
		if c.cacheSyncErr == nil {
			log.Infow("cache synced",
				zap.String("cost_time", time.Since(now).String()),
				zap.String("cluster", c.name),
			)
			c.metricsCollector.IncrCacheSyncOperation("success")
		} else {
			log.Errorw("failed to sync cache",
				zap.String("cost_time", time.Since(now).String()),
				zap.String("cluster", c.name),
			)
			c.metricsCollector.IncrCacheSyncOperation("failure")
		}
	}()

	backoff := wait.Backoff{
		Duration: 2 * time.Second,
		Factor:   1,
		Steps:    5,
	}
	var lastSyncErr error
	err := wait.ExponentialBackoff(backoff, func() (done bool, err error) {
		// impossibly return: false, nil
		// so can safe used
		done, lastSyncErr = c.syncCacheOnce(ctx)
		select {
		case <-ctx.Done():
			err = context.Canceled
		default:
			break
		}
		return
	})
	if err != nil {
		// if ErrWaitTimeout then set lastSyncErr
		c.cacheSyncErr = lastSyncErr
	}
	close(c.cacheSynced)

	if !atomic.CompareAndSwapInt32(&c.cacheState, _cacheSyncing, _cacheSynced) {
		panic("dubious state when sync cache")
	}
}

func (c *cluster) syncCacheOnce(ctx context.Context) (bool, error) {
	routes, err := c.route.List(ctx)
	if err != nil {
		log.Errorf("failed to list routes in APISIX: %s", err)
		return false, err
	}
	upstreams, err := c.upstream.List(ctx)
	if err != nil {
		log.Errorf("failed to list upstreams in APISIX: %s", err)
		return false, err
	}
	ssl, err := c.ssl.List(ctx)
	if err != nil {
		log.Errorf("failed to list ssl in APISIX: %s", err)
		return false, err
	}
	streamRoutes, err := c.streamRoute.List(ctx)
	if err != nil {
		log.Errorf("failed to list stream_routes in APISIX: %s", err)
		return false, err
	}
	globalRules, err := c.globalRules.List(ctx)
	if err != nil {
		log.Errorf("failed to list global_rules in APISIX: %s", err)
		return false, err
	}
	consumers, err := c.consumer.List(ctx)
	if err != nil {
		log.Errorf("failed to list consumers in APISIX: %s", err)
		return false, err
	}
	pluginConfigs, err := c.pluginConfig.List(ctx)
	if err != nil {
		log.Errorf("failed to list plugin_configs in APISIX: %s", err)
		return false, err
	}

	for _, r := range routes {
		if err := c.cache.InsertRoute(r); err != nil {
			log.Errorw("failed to insert route to cache",
				zap.String("route", r.ID),
				zap.String("cluster", c.name),
				zap.String("error", err.Error()),
			)
			return false, err
		}
	}
	for _, u := range upstreams {
		if err := c.cache.InsertUpstream(u); err != nil {
			log.Errorw("failed to insert upstream to cache",
				zap.String("upstream", u.ID),
				zap.String("cluster", c.name),
				zap.String("error", err.Error()),
			)
			return false, err
		}
	}
	for _, s := range ssl {
		if err := c.cache.InsertSSL(s); err != nil {
			log.Errorw("failed to insert ssl to cache",
				zap.String("ssl", s.ID),
				zap.String("cluster", c.name),
				zap.String("error", err.Error()),
			)
			return false, err
		}
	}
	for _, sr := range streamRoutes {
		if err := c.cache.InsertStreamRoute(sr); err != nil {
			log.Errorw("failed to insert stream_route to cache",
				zap.Any("stream_route", sr),
				zap.String("cluster", c.name),
				zap.String("error", err.Error()),
			)
			return false, err
		}
	}
	for _, gr := range globalRules {
		if err := c.cache.InsertGlobalRule(gr); err != nil {
			log.Errorw("failed to insert global_rule to cache",
				zap.Any("global_rule", gr),
				zap.String("cluster", c.name),
				zap.String("error", err.Error()),
			)
			return false, err
		}
	}
	for _, consumer := range consumers {
		if err := c.cache.InsertConsumer(consumer); err != nil {
			log.Errorw("failed to insert consumer to cache",
				zap.Any("consumer", consumer),
				zap.String("cluster", c.name),
				zap.String("error", err.Error()),
			)
		}
	}
	for _, u := range pluginConfigs {
		if err := c.cache.InsertPluginConfig(u); err != nil {
			log.Errorw("failed to insert pluginConfig to cache",
				zap.String("pluginConfig", u.ID),
				zap.String("cluster", c.name),
				zap.String("error", err.Error()),
			)
			return false, err
		}
	}
	return true, nil
}

// String implements Cluster.String method.
func (c *cluster) String() string {
	return fmt.Sprintf("name=%s; base_url=%s", c.name, c.baseURL)
}

// HasSynced implements Cluster.HasSynced method.
func (c *cluster) HasSynced(ctx context.Context) error {
	if c.cacheSyncErr != nil {
		return c.cacheSyncErr
	}
	if atomic.LoadInt32(&c.cacheState) == _cacheSynced {
		return nil
	}

	// still in sync
	now := time.Now()
	log.Warnf("waiting cluster %s to ready, it may takes a while", c.name)
	select {
	case <-ctx.Done():
		log.Errorf("failed to wait cluster to ready: %s", ctx.Err())
		return ctx.Err()
	case <-c.cacheSynced:
		if c.cacheSyncErr != nil {
			// See https://github.com/apache/apisix-ingress-controller/issues/448
			// for more details.
			return c.cacheSyncErr
		}
		log.Warnf("cluster %s now is ready, cost time %s", c.name, time.Since(now).String())
		return nil
	}
}

// syncSchema syncs schema from APISIX regularly according to the interval.
func (c *cluster) syncSchema(ctx context.Context, interval time.Duration) {
	ticker := time.NewTicker(interval)
	defer ticker.Stop()

	for {
		if err := c.syncSchemaOnce(ctx); err != nil {
			log.Errorf("failed to sync schema: %s", err)
			c.metricsCollector.IncrSyncOperation("schema", "failure")
		}

		select {
		case <-ticker.C:
			continue
		case <-ctx.Done():
			return
		}
	}
}

// syncSchemaOnce syncs schema from APISIX once.
// It firstly deletes all the schema in the cache,
// then queries and inserts to the cache.
func (c *cluster) syncSchemaOnce(ctx context.Context) error {
	log.Infow("syncing schema", zap.String("cluster", c.name))

	schemaList, err := c.cache.ListSchema()
	if err != nil {
		log.Errorf("failed to list schema in the cache: %s", err)
		return err
	}
	for _, s := range schemaList {
		if err := c.cache.DeleteSchema(s); err != nil {
			log.Warnw("failed to delete schema in cache",
				zap.String("schemaName", s.Name),
				zap.String("schemaContent", s.Content),
				zap.String("error", err.Error()),
			)
		}
	}

	// update plugins' schema.
	pluginList, err := c.plugin.List(ctx)
	if err != nil {
		log.Errorf("failed to list plugin names in APISIX: %s", err)
		return err
	}
	for _, p := range pluginList {
		ps, err := c.schema.GetPluginSchema(ctx, p)
		if err != nil {
			log.Warnw("failed to get plugin schema",
				zap.String("plugin", p),
				zap.String("error", err.Error()),
			)
			continue
		}

		if err := c.cache.InsertSchema(ps); err != nil {
			log.Warnw("failed to insert schema to cache",
				zap.String("plugin", p),
				zap.String("cluster", c.name),
				zap.String("error", err.Error()),
			)
			continue
		}
	}
	c.metricsCollector.IncrSyncOperation("schema", "success")
	return nil
}

// Route implements Cluster.Route method.
func (c *cluster) Route() Route {
	return c.route
}

// Upstream implements Cluster.Upstream method.
func (c *cluster) Upstream() Upstream {
	return c.upstream
}

// SSL implements Cluster.SSL method.
func (c *cluster) SSL() SSL {
	return c.ssl
}

// StreamRoute implements Cluster.StreamRoute method.
func (c *cluster) StreamRoute() StreamRoute {
	return c.streamRoute
}

// GlobalRule implements Cluster.GlobalRule method.
func (c *cluster) GlobalRule() GlobalRule {
	return c.globalRules
}

// Consumer implements Cluster.Consumer method.
func (c *cluster) Consumer() Consumer {
	return c.consumer
}

// Plugin implements Cluster.Plugin method.
func (c *cluster) Plugin() Plugin {
	return c.plugin
}

// PluginConfig implements Cluster.PluginConfig method.
func (c *cluster) PluginConfig() PluginConfig {
	return c.pluginConfig
}

// Schema implements Cluster.Schema method.
func (c *cluster) Schema() Schema {
	return c.schema
}

func (c *cluster) PluginMetadata() PluginMetadata {
	return c.pluginMetadata
}

func (c *cluster) UpstreamServiceRelation() UpstreamServiceRelation {
	return c.upstreamServiceRelation
}

// HealthCheck implements Cluster.HealthCheck method.
func (c *cluster) HealthCheck(ctx context.Context) (err error) {
	if c.cacheSyncErr != nil {
		err = c.cacheSyncErr
		return
	}
	if atomic.LoadInt32(&c.cacheState) == _cacheSyncing {
		return
	}

	// Retry three times in a row, and exit if all of them fail.
	backoff := wait.Backoff{
		Duration: 5 * time.Second,
		Factor:   1,
		Steps:    3,
	}
	var lastCheckErr error
	err = wait.ExponentialBackoffWithContext(ctx, backoff, func() (done bool, _ error) {
		if lastCheckErr = c.healthCheck(ctx); lastCheckErr != nil {
			log.Warnf("failed to check health for cluster %s: %s, will retry", c.name, lastCheckErr)
			return
		}
		done = true
		return
	})
	if err != nil {
		// if ErrWaitTimeout then set lastSyncErr
		c.cacheSyncErr = lastCheckErr
	}
	return err
}

func (c *cluster) healthCheck(ctx context.Context) (err error) {
	// tcp socket probe
	d := net.Dialer{Timeout: 3 * time.Second}
	conn, err := d.DialContext(ctx, "tcp", c.baseURLHost)
	if err != nil {
		return err
	}
	if er := conn.Close(); er != nil {
		log.Warnw("failed to close tcp probe connection",
			zap.Error(err),
			zap.String("cluster", c.name),
		)
	}
	return
}

func (c *cluster) applyAuth(req *http.Request) {
	if c.adminKey != "" {
		req.Header.Set("X-API-Key", c.adminKey)
	}
}

func (c *cluster) do(req *http.Request) (*http.Response, error) {
	c.applyAuth(req)
	return c.cli.Do(req)
}

func (c *cluster) isFunctionDisabled(body string) bool {
	return strings.Contains(body, "is disabled")
}

func (c *cluster) getResource(ctx context.Context, url, resource string) (*item, error) {
	log.Debugw("get resource in cluster",
		zap.String("cluster_name", c.name),
		zap.String("name", resource),
		zap.String("url", url),
	)
	req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
	if err != nil {
		return nil, err
	}
	start := time.Now()
	resp, err := c.do(req)
	if err != nil {
		return nil, err
	}
	c.metricsCollector.RecordAPISIXLatency(time.Since(start), "get")
	c.metricsCollector.RecordAPISIXCode(resp.StatusCode, resource)

	defer drainBody(resp.Body, url)
	if resp.StatusCode != http.StatusOK {
		body := readBody(resp.Body, url)
		if c.isFunctionDisabled(body) {
			return nil, ErrFunctionDisabled
		}
		if resp.StatusCode == http.StatusNotFound {
			return nil, cache.ErrNotFound
		} else {
			err = multierr.Append(err, fmt.Errorf("unexpected status code %d", resp.StatusCode))
			err = multierr.Append(err, fmt.Errorf("error message: %s", body))
		}
		return nil, err
	}

	if c.adminVersion == "v3" {
		var res item

		dec := json.NewDecoder(resp.Body)
		if err := dec.Decode(&res); err != nil {
			return nil, err
		}
		return &res, nil
	}
	var res getResponse

	dec := json.NewDecoder(resp.Body)
	if err := dec.Decode(&res); err != nil {
		return nil, err
	}
	return &res.Item, nil
}

func (c *cluster) listResource(ctx context.Context, url, resource string) (items, error) {
	log.Debugw("list resource in cluster",
		zap.String("cluster_name", c.name),
		zap.String("name", resource),
		zap.String("url", url),
	)
	req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
	if err != nil {
		return nil, err
	}
	start := time.Now()
	resp, err := c.do(req)
	if err != nil {
		return nil, err
	}
	c.metricsCollector.RecordAPISIXLatency(time.Since(start), "list")
	c.metricsCollector.RecordAPISIXCode(resp.StatusCode, resource)

	defer drainBody(resp.Body, url)
	if resp.StatusCode != http.StatusOK {
		body := readBody(resp.Body, url)
		if c.isFunctionDisabled(body) {
			return nil, ErrFunctionDisabled
		}
		err = multierr.Append(err, fmt.Errorf("unexpected status code %d", resp.StatusCode))
		err = multierr.Append(err, fmt.Errorf("error message: %s", body))
		return nil, err
	}

	if c.adminVersion == "v3" {
		var list listResponseV3

		dec := json.NewDecoder(resp.Body)
		if err := dec.Decode(&list); err != nil {
			return nil, err
		}
		return list.List, nil
	}
	var list listResponse

	dec := json.NewDecoder(resp.Body)
	if err := dec.Decode(&list); err != nil {
		return nil, err
	}
	return list.Node.Items, nil
}

func (c *cluster) createResource(ctx context.Context, url, resource string, body []byte) (*item, error) {
	log.Debugw("creating resource in cluster",
		zap.String("cluster_name", c.name),
		zap.String("name", resource),
		zap.String("url", url),
		zap.ByteString("body", body),
	)
	req, err := http.NewRequestWithContext(ctx, http.MethodPut, url, bytes.NewReader(body))
	if err != nil {
		return nil, err
	}
	start := time.Now()
	resp, err := c.do(req)
	if err != nil {
		return nil, err
	}
	c.metricsCollector.RecordAPISIXLatency(time.Since(start), "create")
	c.metricsCollector.RecordAPISIXCode(resp.StatusCode, resource)

	defer drainBody(resp.Body, url)

	if resp.StatusCode != http.StatusCreated && resp.StatusCode != http.StatusOK {
		body := readBody(resp.Body, url)
		if c.isFunctionDisabled(body) {
			return nil, ErrFunctionDisabled
		}
		err = multierr.Append(err, fmt.Errorf("unexpected status code %d", resp.StatusCode))
		err = multierr.Append(err, fmt.Errorf("error message: %s", body))
		return nil, err
	}

	if c.adminVersion == "v3" {
		var cr createResponseV3

		dec := json.NewDecoder(resp.Body)
		if err := dec.Decode(&cr); err != nil {
			return nil, err
		}

		return &cr.item, nil
	}
	var cr createResponse
	dec := json.NewDecoder(resp.Body)
	if err := dec.Decode(&cr); err != nil {
		return nil, err
	}
	return &cr.Item, nil
}

func (c *cluster) updateResource(ctx context.Context, url, resource string, body []byte) (*item, error) {
	log.Debugw("updating resource in cluster",
		zap.String("cluster_name", c.name),
		zap.String("name", resource),
		zap.String("url", url),
		zap.ByteString("body", body),
	)
	req, err := http.NewRequestWithContext(ctx, http.MethodPut, url, bytes.NewReader(body))
	if err != nil {
		return nil, err
	}
	start := time.Now()
	resp, err := c.do(req)
	if err != nil {
		return nil, err
	}
	c.metricsCollector.RecordAPISIXLatency(time.Since(start), "update")
	c.metricsCollector.RecordAPISIXCode(resp.StatusCode, resource)

	defer drainBody(resp.Body, url)

	if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
		body := readBody(resp.Body, url)
		log.Debugw("update response",
			zap.Int("status code %d", resp.StatusCode),
			zap.String("body %s", body),
		)
		if c.isFunctionDisabled(body) {
			return nil, ErrFunctionDisabled
		}
		err = multierr.Append(err, fmt.Errorf("unexpected status code %d", resp.StatusCode))
		err = multierr.Append(err, fmt.Errorf("error message: %s", body))
		return nil, err
	}
	if c.adminVersion == "v3" {
		var ur updateResponseV3

		dec := json.NewDecoder(resp.Body)
		if err := dec.Decode(&ur); err != nil {
			return nil, err
		}

		return &ur.item, nil
	}
	var ur updateResponse
	dec := json.NewDecoder(resp.Body)
	if err := dec.Decode(&ur); err != nil {
		return nil, err
	}
	return &ur.Item, nil
}

func (c *cluster) deleteResource(ctx context.Context, url, resource string) error {
	log.Debugw("deleting resource in cluster",
		zap.String("cluster_name", c.name),
		zap.String("name", resource),
		zap.String("url", url),
	)
	req, err := http.NewRequestWithContext(ctx, http.MethodDelete, url, nil)
	if err != nil {
		return err
	}
	start := time.Now()
	resp, err := c.do(req)
	if err != nil {
		return err
	}
	c.metricsCollector.RecordAPISIXLatency(time.Since(start), "delete")
	c.metricsCollector.RecordAPISIXCode(resp.StatusCode, resource)

	defer drainBody(resp.Body, url)

	if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusNotFound {
		message := readBody(resp.Body, url)
		if c.isFunctionDisabled(message) {
			return ErrFunctionDisabled
		}
		err = multierr.Append(err, fmt.Errorf("unexpected status code %d", resp.StatusCode))
		err = multierr.Append(err, fmt.Errorf("error message: %s", message))
		if strings.Contains(message, "still using") {
			return cache.ErrStillInUse
		}
		return err
	}
	return nil
}

// drainBody reads whole data until EOF from r, then close it.
func drainBody(r io.ReadCloser, url string) {
	_, err := io.Copy(io.Discard, r)
	if err != nil {
		if err.Error() != _errReadOnClosedResBody.Error() {
			log.Warnw("failed to drain body (read)",
				zap.String("url", url),
				zap.Error(err),
			)
		}
	}

	if err := r.Close(); err != nil {
		log.Warnw("failed to drain body (close)",
			zap.String("url", url),
			zap.Error(err),
		)
	}
}

func readBody(r io.ReadCloser, url string) string {
	defer func() {
		if err := r.Close(); err != nil {
			log.Warnw("failed to close body", zap.String("url", url), zap.Error(err))
		}
	}()
	data, err := io.ReadAll(r)
	if err != nil {
		log.Warnw("failed to read body", zap.String("url", url), zap.Error(err))
		return ""
	}
	return string(data)
}

// getSchema returns the schema of APISIX object.
func (c *cluster) getSchema(ctx context.Context, url, resource string) (string, error) {
	req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
	if err != nil {
		return "", err
	}
	start := time.Now()
	resp, err := c.do(req)
	if err != nil {
		return "", err
	}
	c.metricsCollector.RecordAPISIXLatency(time.Since(start), "getSchema")
	c.metricsCollector.RecordAPISIXCode(resp.StatusCode, resource)

	defer drainBody(resp.Body, url)
	if resp.StatusCode != http.StatusOK {
		if resp.StatusCode == http.StatusNotFound {
			return "", cache.ErrNotFound
		} else {
			err = multierr.Append(err, fmt.Errorf("unexpected status code %d", resp.StatusCode))
			err = multierr.Append(err, fmt.Errorf("error message: %s", readBody(resp.Body, url)))
		}
		return "", err
	}

	return readBody(resp.Body, url), nil
}

// getList returns a list of string.
func (c *cluster) getList(ctx context.Context, url, resource string) ([]string, error) {
	req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
	if err != nil {
		return nil, err
	}
	start := time.Now()
	resp, err := c.do(req)
	if err != nil {
		return nil, err
	}
	c.metricsCollector.RecordAPISIXLatency(time.Since(start), "getList")
	c.metricsCollector.RecordAPISIXCode(resp.StatusCode, resource)

	defer drainBody(resp.Body, url)
	if resp.StatusCode != http.StatusOK {
		if resp.StatusCode == http.StatusNotFound {
			return nil, cache.ErrNotFound
		} else {
			err = multierr.Append(err, fmt.Errorf("unexpected status code %d", resp.StatusCode))
			err = multierr.Append(err, fmt.Errorf("error message: %s", readBody(resp.Body, url)))
		}
		return nil, err
	}

	var listResponse map[string]interface{}
	dec := json.NewDecoder(resp.Body)
	if err := dec.Decode(&listResponse); err != nil {
		return nil, err
	}
	res := make([]string, 0, len(listResponse))

	for name := range listResponse {
		res = append(res, name)
	}
	return res, nil
}
