| // Licensed to the Apache Software Foundation (ASF) under one or more |
| // contributor license agreements. See the NOTICE file distributed with |
| // this work for additional information regarding copyright ownership. |
| // The ASF licenses this file to You under the Apache License, Version 2.0 |
| // (the "License"); you may not use this file except in compliance with |
| // the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package apisix |
| |
| import ( |
| "bytes" |
| "context" |
| "encoding/json" |
| "errors" |
| "fmt" |
| "io" |
| "net" |
| "net/http" |
| "net/url" |
| "strings" |
| "sync/atomic" |
| "time" |
| |
| "go.uber.org/multierr" |
| "go.uber.org/zap" |
| "k8s.io/apimachinery/pkg/util/wait" |
| |
| "github.com/apache/apisix-ingress-controller/pkg/apisix/cache" |
| "github.com/apache/apisix-ingress-controller/pkg/log" |
| "github.com/apache/apisix-ingress-controller/pkg/metrics" |
| "github.com/apache/apisix-ingress-controller/pkg/types" |
| ) |
| |
| const ( |
| _defaultTimeout = 5 * time.Second |
| _defaultSyncInterval = 6 * time.Hour |
| |
| _cacheSyncing = iota |
| _cacheSynced |
| ) |
| |
| var ( |
| // ErrClusterNotExist means a cluster doesn't exist. |
| ErrClusterNotExist = errors.New("client not exist") |
| // ErrDuplicatedCluster means the cluster adding request was |
| // rejected since the cluster was already created. |
| ErrDuplicatedCluster = errors.New("duplicated cluster") |
| // ErrFunctionDisabled means the APISIX function is disabled |
| ErrFunctionDisabled = errors.New("function disabled") |
| |
| _errReadOnClosedResBody = errors.New("http: read on closed response body") |
| |
| // Default shared transport for apisix client |
| _defaultTransport = &http.Transport{ |
| Proxy: http.ProxyFromEnvironment, |
| Dial: (&net.Dialer{ |
| Timeout: 3 * time.Second, |
| }).Dial, |
| DialContext: (&net.Dialer{ |
| Timeout: 3 * time.Second, |
| }).DialContext, |
| ResponseHeaderTimeout: 30 * time.Second, |
| ExpectContinueTimeout: 1 * time.Second, |
| } |
| ) |
| |
| // ClusterOptions contains parameters to customize APISIX client. |
| type ClusterOptions struct { |
| AdminAPIVersion string |
| Name string |
| AdminKey string |
| BaseURL string |
| Timeout time.Duration |
| // SyncInterval is the interval to sync schema. |
| SyncInterval types.TimeDuration |
| MetricsCollector metrics.Collector |
| } |
| |
| type cluster struct { |
| adminVersion string |
| name string |
| baseURL string |
| baseURLHost string |
| adminKey string |
| cli *http.Client |
| cacheState int32 |
| cache cache.Cache |
| cacheSynced chan struct{} |
| cacheSyncErr error |
| route Route |
| upstream Upstream |
| ssl SSL |
| streamRoute StreamRoute |
| globalRules GlobalRule |
| consumer Consumer |
| plugin Plugin |
| schema Schema |
| pluginConfig PluginConfig |
| metricsCollector metrics.Collector |
| upstreamServiceRelation UpstreamServiceRelation |
| pluginMetadata PluginMetadata |
| } |
| |
| func newCluster(ctx context.Context, o *ClusterOptions) (Cluster, error) { |
| if o.BaseURL == "" { |
| return nil, errors.New("empty base url") |
| } |
| if o.Timeout == time.Duration(0) { |
| o.Timeout = _defaultTimeout |
| } |
| if o.SyncInterval.Duration == time.Duration(0) { |
| o.SyncInterval = types.TimeDuration{Duration: _defaultSyncInterval} |
| } |
| o.BaseURL = strings.TrimSuffix(o.BaseURL, "/") |
| |
| u, err := url.Parse(o.BaseURL) |
| if err != nil { |
| return nil, err |
| } |
| |
| // if the version is not v3, then fallback to v2 |
| adminVersion := o.AdminAPIVersion |
| if adminVersion != "v3" { |
| adminVersion = "v2" |
| } |
| c := &cluster{ |
| adminVersion: adminVersion, |
| name: o.Name, |
| baseURL: o.BaseURL, |
| baseURLHost: u.Host, |
| adminKey: o.AdminKey, |
| cli: &http.Client{ |
| Timeout: o.Timeout, |
| Transport: _defaultTransport, |
| }, |
| cacheState: _cacheSyncing, // default state |
| cacheSynced: make(chan struct{}), |
| metricsCollector: o.MetricsCollector, |
| } |
| c.route = newRouteClient(c) |
| c.upstream = newUpstreamClient(c) |
| c.ssl = newSSLClient(c) |
| c.streamRoute = newStreamRouteClient(c) |
| c.globalRules = newGlobalRuleClient(c) |
| c.consumer = newConsumerClient(c) |
| c.plugin = newPluginClient(c) |
| c.schema = newSchemaClient(c) |
| c.pluginConfig = newPluginConfigClient(c) |
| c.upstreamServiceRelation = newUpstreamServiceRelation(c) |
| c.pluginMetadata = newPluginMetadataClient(c) |
| |
| c.cache, err = cache.NewMemDBCache() |
| if err != nil { |
| return nil, err |
| } |
| |
| go c.syncCache(ctx) |
| go c.syncSchema(ctx, o.SyncInterval.Duration) |
| |
| return c, nil |
| } |
| |
| func (c *cluster) syncCache(ctx context.Context) { |
| log.Infow("syncing cache", zap.String("cluster", c.name)) |
| now := time.Now() |
| defer func() { |
| if c.cacheSyncErr == nil { |
| log.Infow("cache synced", |
| zap.String("cost_time", time.Since(now).String()), |
| zap.String("cluster", c.name), |
| ) |
| c.metricsCollector.IncrCacheSyncOperation("success") |
| } else { |
| log.Errorw("failed to sync cache", |
| zap.String("cost_time", time.Since(now).String()), |
| zap.String("cluster", c.name), |
| ) |
| c.metricsCollector.IncrCacheSyncOperation("failure") |
| } |
| }() |
| |
| backoff := wait.Backoff{ |
| Duration: 2 * time.Second, |
| Factor: 1, |
| Steps: 5, |
| } |
| var lastSyncErr error |
| err := wait.ExponentialBackoff(backoff, func() (done bool, err error) { |
| // impossibly return: false, nil |
| // so can safe used |
| done, lastSyncErr = c.syncCacheOnce(ctx) |
| select { |
| case <-ctx.Done(): |
| err = context.Canceled |
| default: |
| break |
| } |
| return |
| }) |
| if err != nil { |
| // if ErrWaitTimeout then set lastSyncErr |
| c.cacheSyncErr = lastSyncErr |
| } |
| close(c.cacheSynced) |
| |
| if !atomic.CompareAndSwapInt32(&c.cacheState, _cacheSyncing, _cacheSynced) { |
| panic("dubious state when sync cache") |
| } |
| } |
| |
| func (c *cluster) syncCacheOnce(ctx context.Context) (bool, error) { |
| routes, err := c.route.List(ctx) |
| if err != nil { |
| log.Errorf("failed to list routes in APISIX: %s", err) |
| return false, err |
| } |
| upstreams, err := c.upstream.List(ctx) |
| if err != nil { |
| log.Errorf("failed to list upstreams in APISIX: %s", err) |
| return false, err |
| } |
| ssl, err := c.ssl.List(ctx) |
| if err != nil { |
| log.Errorf("failed to list ssl in APISIX: %s", err) |
| return false, err |
| } |
| streamRoutes, err := c.streamRoute.List(ctx) |
| if err != nil { |
| log.Errorf("failed to list stream_routes in APISIX: %s", err) |
| return false, err |
| } |
| globalRules, err := c.globalRules.List(ctx) |
| if err != nil { |
| log.Errorf("failed to list global_rules in APISIX: %s", err) |
| return false, err |
| } |
| consumers, err := c.consumer.List(ctx) |
| if err != nil { |
| log.Errorf("failed to list consumers in APISIX: %s", err) |
| return false, err |
| } |
| pluginConfigs, err := c.pluginConfig.List(ctx) |
| if err != nil { |
| log.Errorf("failed to list plugin_configs in APISIX: %s", err) |
| return false, err |
| } |
| |
| for _, r := range routes { |
| if err := c.cache.InsertRoute(r); err != nil { |
| log.Errorw("failed to insert route to cache", |
| zap.String("route", r.ID), |
| zap.String("cluster", c.name), |
| zap.String("error", err.Error()), |
| ) |
| return false, err |
| } |
| } |
| for _, u := range upstreams { |
| if err := c.cache.InsertUpstream(u); err != nil { |
| log.Errorw("failed to insert upstream to cache", |
| zap.String("upstream", u.ID), |
| zap.String("cluster", c.name), |
| zap.String("error", err.Error()), |
| ) |
| return false, err |
| } |
| } |
| for _, s := range ssl { |
| if err := c.cache.InsertSSL(s); err != nil { |
| log.Errorw("failed to insert ssl to cache", |
| zap.String("ssl", s.ID), |
| zap.String("cluster", c.name), |
| zap.String("error", err.Error()), |
| ) |
| return false, err |
| } |
| } |
| for _, sr := range streamRoutes { |
| if err := c.cache.InsertStreamRoute(sr); err != nil { |
| log.Errorw("failed to insert stream_route to cache", |
| zap.Any("stream_route", sr), |
| zap.String("cluster", c.name), |
| zap.String("error", err.Error()), |
| ) |
| return false, err |
| } |
| } |
| for _, gr := range globalRules { |
| if err := c.cache.InsertGlobalRule(gr); err != nil { |
| log.Errorw("failed to insert global_rule to cache", |
| zap.Any("global_rule", gr), |
| zap.String("cluster", c.name), |
| zap.String("error", err.Error()), |
| ) |
| return false, err |
| } |
| } |
| for _, consumer := range consumers { |
| if err := c.cache.InsertConsumer(consumer); err != nil { |
| log.Errorw("failed to insert consumer to cache", |
| zap.Any("consumer", consumer), |
| zap.String("cluster", c.name), |
| zap.String("error", err.Error()), |
| ) |
| } |
| } |
| for _, u := range pluginConfigs { |
| if err := c.cache.InsertPluginConfig(u); err != nil { |
| log.Errorw("failed to insert pluginConfig to cache", |
| zap.String("pluginConfig", u.ID), |
| zap.String("cluster", c.name), |
| zap.String("error", err.Error()), |
| ) |
| return false, err |
| } |
| } |
| return true, nil |
| } |
| |
| // String implements Cluster.String method. |
| func (c *cluster) String() string { |
| return fmt.Sprintf("name=%s; base_url=%s", c.name, c.baseURL) |
| } |
| |
| // HasSynced implements Cluster.HasSynced method. |
| func (c *cluster) HasSynced(ctx context.Context) error { |
| if c.cacheSyncErr != nil { |
| return c.cacheSyncErr |
| } |
| if atomic.LoadInt32(&c.cacheState) == _cacheSynced { |
| return nil |
| } |
| |
| // still in sync |
| now := time.Now() |
| log.Warnf("waiting cluster %s to ready, it may takes a while", c.name) |
| select { |
| case <-ctx.Done(): |
| log.Errorf("failed to wait cluster to ready: %s", ctx.Err()) |
| return ctx.Err() |
| case <-c.cacheSynced: |
| if c.cacheSyncErr != nil { |
| // See https://github.com/apache/apisix-ingress-controller/issues/448 |
| // for more details. |
| return c.cacheSyncErr |
| } |
| log.Warnf("cluster %s now is ready, cost time %s", c.name, time.Since(now).String()) |
| return nil |
| } |
| } |
| |
| // syncSchema syncs schema from APISIX regularly according to the interval. |
| func (c *cluster) syncSchema(ctx context.Context, interval time.Duration) { |
| ticker := time.NewTicker(interval) |
| defer ticker.Stop() |
| |
| for { |
| if err := c.syncSchemaOnce(ctx); err != nil { |
| log.Errorf("failed to sync schema: %s", err) |
| c.metricsCollector.IncrSyncOperation("schema", "failure") |
| } |
| |
| select { |
| case <-ticker.C: |
| continue |
| case <-ctx.Done(): |
| return |
| } |
| } |
| } |
| |
| // syncSchemaOnce syncs schema from APISIX once. |
| // It firstly deletes all the schema in the cache, |
| // then queries and inserts to the cache. |
| func (c *cluster) syncSchemaOnce(ctx context.Context) error { |
| log.Infow("syncing schema", zap.String("cluster", c.name)) |
| |
| schemaList, err := c.cache.ListSchema() |
| if err != nil { |
| log.Errorf("failed to list schema in the cache: %s", err) |
| return err |
| } |
| for _, s := range schemaList { |
| if err := c.cache.DeleteSchema(s); err != nil { |
| log.Warnw("failed to delete schema in cache", |
| zap.String("schemaName", s.Name), |
| zap.String("schemaContent", s.Content), |
| zap.String("error", err.Error()), |
| ) |
| } |
| } |
| |
| // update plugins' schema. |
| pluginList, err := c.plugin.List(ctx) |
| if err != nil { |
| log.Errorf("failed to list plugin names in APISIX: %s", err) |
| return err |
| } |
| for _, p := range pluginList { |
| ps, err := c.schema.GetPluginSchema(ctx, p) |
| if err != nil { |
| log.Warnw("failed to get plugin schema", |
| zap.String("plugin", p), |
| zap.String("error", err.Error()), |
| ) |
| continue |
| } |
| |
| if err := c.cache.InsertSchema(ps); err != nil { |
| log.Warnw("failed to insert schema to cache", |
| zap.String("plugin", p), |
| zap.String("cluster", c.name), |
| zap.String("error", err.Error()), |
| ) |
| continue |
| } |
| } |
| c.metricsCollector.IncrSyncOperation("schema", "success") |
| return nil |
| } |
| |
| // Route implements Cluster.Route method. |
| func (c *cluster) Route() Route { |
| return c.route |
| } |
| |
| // Upstream implements Cluster.Upstream method. |
| func (c *cluster) Upstream() Upstream { |
| return c.upstream |
| } |
| |
| // SSL implements Cluster.SSL method. |
| func (c *cluster) SSL() SSL { |
| return c.ssl |
| } |
| |
| // StreamRoute implements Cluster.StreamRoute method. |
| func (c *cluster) StreamRoute() StreamRoute { |
| return c.streamRoute |
| } |
| |
| // GlobalRule implements Cluster.GlobalRule method. |
| func (c *cluster) GlobalRule() GlobalRule { |
| return c.globalRules |
| } |
| |
| // Consumer implements Cluster.Consumer method. |
| func (c *cluster) Consumer() Consumer { |
| return c.consumer |
| } |
| |
| // Plugin implements Cluster.Plugin method. |
| func (c *cluster) Plugin() Plugin { |
| return c.plugin |
| } |
| |
| // PluginConfig implements Cluster.PluginConfig method. |
| func (c *cluster) PluginConfig() PluginConfig { |
| return c.pluginConfig |
| } |
| |
| // Schema implements Cluster.Schema method. |
| func (c *cluster) Schema() Schema { |
| return c.schema |
| } |
| |
| func (c *cluster) PluginMetadata() PluginMetadata { |
| return c.pluginMetadata |
| } |
| |
| func (c *cluster) UpstreamServiceRelation() UpstreamServiceRelation { |
| return c.upstreamServiceRelation |
| } |
| |
| // HealthCheck implements Cluster.HealthCheck method. |
| func (c *cluster) HealthCheck(ctx context.Context) (err error) { |
| if c.cacheSyncErr != nil { |
| err = c.cacheSyncErr |
| return |
| } |
| if atomic.LoadInt32(&c.cacheState) == _cacheSyncing { |
| return |
| } |
| |
| // Retry three times in a row, and exit if all of them fail. |
| backoff := wait.Backoff{ |
| Duration: 5 * time.Second, |
| Factor: 1, |
| Steps: 3, |
| } |
| var lastCheckErr error |
| err = wait.ExponentialBackoffWithContext(ctx, backoff, func() (done bool, _ error) { |
| if lastCheckErr = c.healthCheck(ctx); lastCheckErr != nil { |
| log.Warnf("failed to check health for cluster %s: %s, will retry", c.name, lastCheckErr) |
| return |
| } |
| done = true |
| return |
| }) |
| if err != nil { |
| // if ErrWaitTimeout then set lastSyncErr |
| c.cacheSyncErr = lastCheckErr |
| } |
| return err |
| } |
| |
| func (c *cluster) healthCheck(ctx context.Context) (err error) { |
| // tcp socket probe |
| d := net.Dialer{Timeout: 3 * time.Second} |
| conn, err := d.DialContext(ctx, "tcp", c.baseURLHost) |
| if err != nil { |
| return err |
| } |
| if er := conn.Close(); er != nil { |
| log.Warnw("failed to close tcp probe connection", |
| zap.Error(err), |
| zap.String("cluster", c.name), |
| ) |
| } |
| return |
| } |
| |
| func (c *cluster) applyAuth(req *http.Request) { |
| if c.adminKey != "" { |
| req.Header.Set("X-API-Key", c.adminKey) |
| } |
| } |
| |
| func (c *cluster) do(req *http.Request) (*http.Response, error) { |
| c.applyAuth(req) |
| return c.cli.Do(req) |
| } |
| |
| func (c *cluster) isFunctionDisabled(body string) bool { |
| return strings.Contains(body, "is disabled") |
| } |
| |
| func (c *cluster) getResource(ctx context.Context, url, resource string) (*item, error) { |
| log.Debugw("get resource in cluster", |
| zap.String("cluster_name", c.name), |
| zap.String("name", resource), |
| zap.String("url", url), |
| ) |
| req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) |
| if err != nil { |
| return nil, err |
| } |
| start := time.Now() |
| resp, err := c.do(req) |
| if err != nil { |
| return nil, err |
| } |
| c.metricsCollector.RecordAPISIXLatency(time.Since(start), "get") |
| c.metricsCollector.RecordAPISIXCode(resp.StatusCode, resource) |
| |
| defer drainBody(resp.Body, url) |
| if resp.StatusCode != http.StatusOK { |
| body := readBody(resp.Body, url) |
| if c.isFunctionDisabled(body) { |
| return nil, ErrFunctionDisabled |
| } |
| if resp.StatusCode == http.StatusNotFound { |
| return nil, cache.ErrNotFound |
| } else { |
| err = multierr.Append(err, fmt.Errorf("unexpected status code %d", resp.StatusCode)) |
| err = multierr.Append(err, fmt.Errorf("error message: %s", body)) |
| } |
| return nil, err |
| } |
| |
| if c.adminVersion == "v3" { |
| var res item |
| |
| dec := json.NewDecoder(resp.Body) |
| if err := dec.Decode(&res); err != nil { |
| return nil, err |
| } |
| return &res, nil |
| } |
| var res getResponse |
| |
| dec := json.NewDecoder(resp.Body) |
| if err := dec.Decode(&res); err != nil { |
| return nil, err |
| } |
| return &res.Item, nil |
| } |
| |
| func (c *cluster) listResource(ctx context.Context, url, resource string) (items, error) { |
| log.Debugw("list resource in cluster", |
| zap.String("cluster_name", c.name), |
| zap.String("name", resource), |
| zap.String("url", url), |
| ) |
| req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) |
| if err != nil { |
| return nil, err |
| } |
| start := time.Now() |
| resp, err := c.do(req) |
| if err != nil { |
| return nil, err |
| } |
| c.metricsCollector.RecordAPISIXLatency(time.Since(start), "list") |
| c.metricsCollector.RecordAPISIXCode(resp.StatusCode, resource) |
| |
| defer drainBody(resp.Body, url) |
| if resp.StatusCode != http.StatusOK { |
| body := readBody(resp.Body, url) |
| if c.isFunctionDisabled(body) { |
| return nil, ErrFunctionDisabled |
| } |
| err = multierr.Append(err, fmt.Errorf("unexpected status code %d", resp.StatusCode)) |
| err = multierr.Append(err, fmt.Errorf("error message: %s", body)) |
| return nil, err |
| } |
| |
| if c.adminVersion == "v3" { |
| var list listResponseV3 |
| |
| dec := json.NewDecoder(resp.Body) |
| if err := dec.Decode(&list); err != nil { |
| return nil, err |
| } |
| return list.List, nil |
| } |
| var list listResponse |
| |
| dec := json.NewDecoder(resp.Body) |
| if err := dec.Decode(&list); err != nil { |
| return nil, err |
| } |
| return list.Node.Items, nil |
| } |
| |
| func (c *cluster) createResource(ctx context.Context, url, resource string, body []byte) (*item, error) { |
| log.Debugw("creating resource in cluster", |
| zap.String("cluster_name", c.name), |
| zap.String("name", resource), |
| zap.String("url", url), |
| zap.ByteString("body", body), |
| ) |
| req, err := http.NewRequestWithContext(ctx, http.MethodPut, url, bytes.NewReader(body)) |
| if err != nil { |
| return nil, err |
| } |
| start := time.Now() |
| resp, err := c.do(req) |
| if err != nil { |
| return nil, err |
| } |
| c.metricsCollector.RecordAPISIXLatency(time.Since(start), "create") |
| c.metricsCollector.RecordAPISIXCode(resp.StatusCode, resource) |
| |
| defer drainBody(resp.Body, url) |
| |
| if resp.StatusCode != http.StatusCreated && resp.StatusCode != http.StatusOK { |
| body := readBody(resp.Body, url) |
| if c.isFunctionDisabled(body) { |
| return nil, ErrFunctionDisabled |
| } |
| err = multierr.Append(err, fmt.Errorf("unexpected status code %d", resp.StatusCode)) |
| err = multierr.Append(err, fmt.Errorf("error message: %s", body)) |
| return nil, err |
| } |
| |
| if c.adminVersion == "v3" { |
| var cr createResponseV3 |
| |
| dec := json.NewDecoder(resp.Body) |
| if err := dec.Decode(&cr); err != nil { |
| return nil, err |
| } |
| |
| return &cr.item, nil |
| } |
| var cr createResponse |
| dec := json.NewDecoder(resp.Body) |
| if err := dec.Decode(&cr); err != nil { |
| return nil, err |
| } |
| return &cr.Item, nil |
| } |
| |
| func (c *cluster) updateResource(ctx context.Context, url, resource string, body []byte) (*item, error) { |
| log.Debugw("updating resource in cluster", |
| zap.String("cluster_name", c.name), |
| zap.String("name", resource), |
| zap.String("url", url), |
| zap.ByteString("body", body), |
| ) |
| req, err := http.NewRequestWithContext(ctx, http.MethodPut, url, bytes.NewReader(body)) |
| if err != nil { |
| return nil, err |
| } |
| start := time.Now() |
| resp, err := c.do(req) |
| if err != nil { |
| return nil, err |
| } |
| c.metricsCollector.RecordAPISIXLatency(time.Since(start), "update") |
| c.metricsCollector.RecordAPISIXCode(resp.StatusCode, resource) |
| |
| defer drainBody(resp.Body, url) |
| |
| if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { |
| body := readBody(resp.Body, url) |
| log.Debugw("update response", |
| zap.Int("status code %d", resp.StatusCode), |
| zap.String("body %s", body), |
| ) |
| if c.isFunctionDisabled(body) { |
| return nil, ErrFunctionDisabled |
| } |
| err = multierr.Append(err, fmt.Errorf("unexpected status code %d", resp.StatusCode)) |
| err = multierr.Append(err, fmt.Errorf("error message: %s", body)) |
| return nil, err |
| } |
| if c.adminVersion == "v3" { |
| var ur updateResponseV3 |
| |
| dec := json.NewDecoder(resp.Body) |
| if err := dec.Decode(&ur); err != nil { |
| return nil, err |
| } |
| |
| return &ur.item, nil |
| } |
| var ur updateResponse |
| dec := json.NewDecoder(resp.Body) |
| if err := dec.Decode(&ur); err != nil { |
| return nil, err |
| } |
| return &ur.Item, nil |
| } |
| |
| func (c *cluster) deleteResource(ctx context.Context, url, resource string) error { |
| log.Debugw("deleting resource in cluster", |
| zap.String("cluster_name", c.name), |
| zap.String("name", resource), |
| zap.String("url", url), |
| ) |
| req, err := http.NewRequestWithContext(ctx, http.MethodDelete, url, nil) |
| if err != nil { |
| return err |
| } |
| start := time.Now() |
| resp, err := c.do(req) |
| if err != nil { |
| return err |
| } |
| c.metricsCollector.RecordAPISIXLatency(time.Since(start), "delete") |
| c.metricsCollector.RecordAPISIXCode(resp.StatusCode, resource) |
| |
| defer drainBody(resp.Body, url) |
| |
| if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusNotFound { |
| message := readBody(resp.Body, url) |
| if c.isFunctionDisabled(message) { |
| return ErrFunctionDisabled |
| } |
| err = multierr.Append(err, fmt.Errorf("unexpected status code %d", resp.StatusCode)) |
| err = multierr.Append(err, fmt.Errorf("error message: %s", message)) |
| if strings.Contains(message, "still using") { |
| return cache.ErrStillInUse |
| } |
| return err |
| } |
| return nil |
| } |
| |
| // drainBody reads whole data until EOF from r, then close it. |
| func drainBody(r io.ReadCloser, url string) { |
| _, err := io.Copy(io.Discard, r) |
| if err != nil { |
| if err.Error() != _errReadOnClosedResBody.Error() { |
| log.Warnw("failed to drain body (read)", |
| zap.String("url", url), |
| zap.Error(err), |
| ) |
| } |
| } |
| |
| if err := r.Close(); err != nil { |
| log.Warnw("failed to drain body (close)", |
| zap.String("url", url), |
| zap.Error(err), |
| ) |
| } |
| } |
| |
| func readBody(r io.ReadCloser, url string) string { |
| defer func() { |
| if err := r.Close(); err != nil { |
| log.Warnw("failed to close body", zap.String("url", url), zap.Error(err)) |
| } |
| }() |
| data, err := io.ReadAll(r) |
| if err != nil { |
| log.Warnw("failed to read body", zap.String("url", url), zap.Error(err)) |
| return "" |
| } |
| return string(data) |
| } |
| |
| // getSchema returns the schema of APISIX object. |
| func (c *cluster) getSchema(ctx context.Context, url, resource string) (string, error) { |
| req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) |
| if err != nil { |
| return "", err |
| } |
| start := time.Now() |
| resp, err := c.do(req) |
| if err != nil { |
| return "", err |
| } |
| c.metricsCollector.RecordAPISIXLatency(time.Since(start), "getSchema") |
| c.metricsCollector.RecordAPISIXCode(resp.StatusCode, resource) |
| |
| defer drainBody(resp.Body, url) |
| if resp.StatusCode != http.StatusOK { |
| if resp.StatusCode == http.StatusNotFound { |
| return "", cache.ErrNotFound |
| } else { |
| err = multierr.Append(err, fmt.Errorf("unexpected status code %d", resp.StatusCode)) |
| err = multierr.Append(err, fmt.Errorf("error message: %s", readBody(resp.Body, url))) |
| } |
| return "", err |
| } |
| |
| return readBody(resp.Body, url), nil |
| } |
| |
| // getList returns a list of string. |
| func (c *cluster) getList(ctx context.Context, url, resource string) ([]string, error) { |
| req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) |
| if err != nil { |
| return nil, err |
| } |
| start := time.Now() |
| resp, err := c.do(req) |
| if err != nil { |
| return nil, err |
| } |
| c.metricsCollector.RecordAPISIXLatency(time.Since(start), "getList") |
| c.metricsCollector.RecordAPISIXCode(resp.StatusCode, resource) |
| |
| defer drainBody(resp.Body, url) |
| if resp.StatusCode != http.StatusOK { |
| if resp.StatusCode == http.StatusNotFound { |
| return nil, cache.ErrNotFound |
| } else { |
| err = multierr.Append(err, fmt.Errorf("unexpected status code %d", resp.StatusCode)) |
| err = multierr.Append(err, fmt.Errorf("error message: %s", readBody(resp.Body, url))) |
| } |
| return nil, err |
| } |
| |
| var listResponse map[string]interface{} |
| dec := json.NewDecoder(resp.Body) |
| if err := dec.Decode(&listResponse); err != nil { |
| return nil, err |
| } |
| res := make([]string, 0, len(listResponse)) |
| |
| for name := range listResponse { |
| res = append(res, name) |
| } |
| return res, nil |
| } |