add metric: check_cluster_health and sync_operation_total (#627)
diff --git a/pkg/apisix/cluster.go b/pkg/apisix/cluster.go
index 34e4492..ef4ef98 100644
--- a/pkg/apisix/cluster.go
+++ b/pkg/apisix/cluster.go
@@ -35,6 +35,7 @@
"github.com/apache/apisix-ingress-controller/pkg/apisix/cache"
"github.com/apache/apisix-ingress-controller/pkg/log"
+ "github.com/apache/apisix-ingress-controller/pkg/metrics"
"github.com/apache/apisix-ingress-controller/pkg/types"
)
@@ -80,23 +81,24 @@
}
type cluster struct {
- name string
- baseURL string
- baseURLHost string
- adminKey string
- cli *http.Client
- cacheState int32
- cache cache.Cache
- cacheSynced chan struct{}
- cacheSyncErr error
- route Route
- upstream Upstream
- ssl SSL
- streamRoute StreamRoute
- globalRules GlobalRule
- consumer Consumer
- plugin Plugin
- schema Schema
+ name string
+ baseURL string
+ baseURLHost string
+ adminKey string
+ cli *http.Client
+ cacheState int32
+ cache cache.Cache
+ cacheSynced chan struct{}
+ cacheSyncErr error
+ route Route
+ upstream Upstream
+ ssl SSL
+ streamRoute StreamRoute
+ globalRules GlobalRule
+ consumer Consumer
+ plugin Plugin
+ schema Schema
+ metricsCollector metrics.Collector
}
func newCluster(ctx context.Context, o *ClusterOptions) (Cluster, error) {
@@ -125,8 +127,9 @@
Timeout: o.Timeout,
Transport: _defaultTransport,
},
- cacheState: _cacheSyncing, // default state
- cacheSynced: make(chan struct{}),
+ cacheState: _cacheSyncing, // default state
+ cacheSynced: make(chan struct{}),
+ metricsCollector: metrics.NewPrometheusCollector(),
}
c.route = newRouteClient(c)
c.upstream = newUpstreamClient(c)
@@ -322,6 +325,7 @@
for {
if err := c.syncSchemaOnce(ctx); err != nil {
log.Warnf("failed to sync schema: %s", err)
+ c.metricsCollector.IncrSyncOperation("schema", "failure")
}
select {
@@ -379,6 +383,7 @@
continue
}
}
+ c.metricsCollector.IncrSyncOperation("schema", "success")
return nil
}
diff --git a/pkg/ingress/apisix_consumer.go b/pkg/ingress/apisix_consumer.go
index f54aad4..0cf28ba 100644
--- a/pkg/ingress/apisix_consumer.go
+++ b/pkg/ingress/apisix_consumer.go
@@ -131,9 +131,11 @@
)
c.controller.recorderEvent(ac, corev1.EventTypeWarning, _resourceSyncAborted, err)
c.controller.recordStatus(ac, _resourceSyncAborted, err, metav1.ConditionFalse)
+ c.controller.metricsCollector.IncrSyncOperation("consumer", "failure")
return err
}
+ c.controller.metricsCollector.IncrSyncOperation("consumer", "success")
c.controller.recorderEvent(ac, corev1.EventTypeNormal, _resourceSynced, nil)
return nil
}
diff --git a/pkg/ingress/apisix_tls.go b/pkg/ingress/apisix_tls.go
index 2ba036d..79c9478 100644
--- a/pkg/ingress/apisix_tls.go
+++ b/pkg/ingress/apisix_tls.go
@@ -146,7 +146,6 @@
c.controller.recordStatus(tls, _resourceSyncAborted, err, metav1.ConditionFalse)
return err
}
-
c.controller.recorderEvent(tls, corev1.EventTypeNormal, _resourceSynced, nil)
c.controller.recordStatus(tls, _resourceSynced, nil, metav1.ConditionTrue)
return err
@@ -173,6 +172,7 @@
func (c *apisixTlsController) handleSyncErr(obj interface{}, err error) {
if err == nil {
c.workqueue.Forget(obj)
+ c.controller.metricsCollector.IncrSyncOperation("ssl", "success")
return
}
log.Warnw("sync ApisixTls failed, will retry",
@@ -180,6 +180,7 @@
zap.Error(err),
)
c.workqueue.AddRateLimited(obj)
+ c.controller.metricsCollector.IncrSyncOperation("ssl", "failure")
}
func (c *apisixTlsController) onAdd(obj interface{}) {
diff --git a/pkg/ingress/controller.go b/pkg/ingress/controller.go
index 4d59b52..2af856d 100644
--- a/pkg/ingress/controller.go
+++ b/pkg/ingress/controller.go
@@ -166,7 +166,7 @@
cfg: cfg,
apiServer: apiSrv,
apisix: client,
- metricsCollector: metrics.NewPrometheusCollector(podName, podNamespace),
+ metricsCollector: metrics.NewPrometheusCollector(),
kubeClient: kubeClient,
watchingNamespace: watchingNamespace,
secretSSLMap: new(sync.Map),
@@ -617,5 +617,6 @@
return
}
log.Debugf("success check health for default cluster")
+ c.metricsCollector.IncrCheckClusterHealth(c.name)
}
}
diff --git a/pkg/ingress/endpoint.go b/pkg/ingress/endpoint.go
index c2eb236..53889f8 100644
--- a/pkg/ingress/endpoint.go
+++ b/pkg/ingress/endpoint.go
@@ -90,12 +90,14 @@
func (c *endpointsController) handleSyncErr(obj interface{}, err error) {
if err == nil {
c.workqueue.Forget(obj)
+ c.controller.metricsCollector.IncrSyncOperation("endpoint", "success")
return
}
log.Warnw("sync endpoints failed, will retry",
zap.Any("object", obj),
)
c.workqueue.AddRateLimited(obj)
+ c.controller.metricsCollector.IncrSyncOperation("endpoint", "failure")
}
func (c *endpointsController) onAdd(obj interface{}) {
diff --git a/pkg/ingress/endpointslice.go b/pkg/ingress/endpointslice.go
index c8eaa05..691e4ed 100644
--- a/pkg/ingress/endpointslice.go
+++ b/pkg/ingress/endpointslice.go
@@ -109,12 +109,14 @@
func (c *endpointSliceController) handleSyncErr(obj interface{}, err error) {
if err == nil {
c.workqueue.Forget(obj)
+ c.controller.metricsCollector.IncrSyncOperation("endpointSlices", "success")
return
}
log.Warnw("sync endpointSlice failed, will retry",
zap.Any("object", obj),
)
c.workqueue.AddRateLimited(obj)
+ c.controller.metricsCollector.IncrSyncOperation("endpointSlices", "failure")
}
func (c *endpointSliceController) onAdd(obj interface{}) {
diff --git a/pkg/ingress/secret.go b/pkg/ingress/secret.go
index 2d8e763..700bdd7 100644
--- a/pkg/ingress/secret.go
+++ b/pkg/ingress/secret.go
@@ -215,6 +215,7 @@
func (c *secretController) handleSyncErr(obj interface{}, err error) {
if err == nil {
c.workqueue.Forget(obj)
+ c.controller.metricsCollector.IncrSyncOperation("secret", "success")
return
}
log.Warnw("sync ApisixTls failed, will retry",
@@ -222,6 +223,7 @@
zap.Error(err),
)
c.workqueue.AddRateLimited(obj)
+ c.controller.metricsCollector.IncrSyncOperation("secret", "failure")
}
func (c *secretController) onAdd(obj interface{}) {
diff --git a/pkg/metrics/prometheus.go b/pkg/metrics/prometheus.go
index 5f3fbfc..56aa1f1 100644
--- a/pkg/metrics/prometheus.go
+++ b/pkg/metrics/prometheus.go
@@ -15,6 +15,7 @@
package metrics
import (
+ "os"
"strconv"
"time"
@@ -37,20 +38,33 @@
RecordAPISIXLatency(time.Duration)
// IncrAPISIXRequest increases the number of requests to apisix.
IncrAPISIXRequest(string)
+ // IncrCheckClusterHealth increases the number of cluster health check operations
+ // with the cluster name label.
+ IncrCheckClusterHealth(string)
+ // IncrSyncOperation increases the number of sync operations with the resource
+ // type label.
+ IncrSyncOperation(string, string)
}
// collector contains necessary messages to collect Prometheus metrics.
type collector struct {
- isLeader prometheus.Gauge
- apisixLatency prometheus.Summary
- apisixRequests *prometheus.CounterVec
- apisixCodes *prometheus.GaugeVec
+ isLeader prometheus.Gauge
+ apisixLatency prometheus.Summary
+ apisixRequests *prometheus.CounterVec
+ apisixCodes *prometheus.GaugeVec
+ checkClusterHealth *prometheus.CounterVec
+ syncOperation *prometheus.CounterVec
}
// NewPrometheusCollectors creates the Prometheus metrics collector.
// It also registers all internal metric collector to prometheus,
// so do not call this function duplicately.
-func NewPrometheusCollector(podName, podNamespace string) Collector {
+func NewPrometheusCollector() Collector {
+ podName := os.Getenv("POD_NAME")
+ podNamespace := os.Getenv("POD_NAMESPACE")
+ if podNamespace == "" {
+ podNamespace = "default"
+ }
constLabels := prometheus.Labels{
"controller_pod": podName,
"controller_namespace": podNamespace,
@@ -91,6 +105,24 @@
},
[]string{"resource"},
),
+ checkClusterHealth: prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Namespace: _namespace,
+ Name: "check_cluster_health_total",
+ Help: "Number of cluster health check operations",
+ ConstLabels: constLabels,
+ },
+ []string{"name"},
+ ),
+ syncOperation: prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Namespace: _namespace,
+ Name: "sync_operation_total",
+ Help: "Number of sync operations",
+ ConstLabels: constLabels,
+ },
+ []string{"resource", "result"},
+ ),
}
// Since we use the DefaultRegisterer, in test cases, the metrics
@@ -99,12 +131,16 @@
prometheus.Unregister(collector.apisixCodes)
prometheus.Unregister(collector.apisixLatency)
prometheus.Unregister(collector.apisixRequests)
+ prometheus.Unregister(collector.checkClusterHealth)
+ prometheus.Unregister(collector.syncOperation)
prometheus.MustRegister(
collector.isLeader,
collector.apisixCodes,
collector.apisixLatency,
collector.apisixRequests,
+ collector.checkClusterHealth,
+ collector.syncOperation,
)
return collector
@@ -140,6 +176,21 @@
c.apisixRequests.WithLabelValues(resource).Inc()
}
+// IncrCheckClusterHealth increases the number of cluster health check
+// operations.
+func (c *collector) IncrCheckClusterHealth(name string) {
+ c.checkClusterHealth.WithLabelValues(name).Inc()
+}
+
+// IncrSyncOperation increases the number of sync operations for specific
+// resource.
+func (c *collector) IncrSyncOperation(resource, result string) {
+ c.syncOperation.With(prometheus.Labels{
+ "resource": resource,
+ "result": result,
+ }).Inc()
+}
+
// Collect collects the prometheus.Collect.
func (c *collector) Collect(ch chan<- prometheus.Metric) {
c.isLeader.Collect(ch)
@@ -147,6 +198,8 @@
c.apisixRequests.Collect(ch)
c.apisixLatency.Collect(ch)
c.apisixCodes.Collect(ch)
+ c.checkClusterHealth.Collect(ch)
+ c.syncOperation.Collect(ch)
}
// Describe describes the prometheus.Describe.
@@ -156,4 +209,6 @@
c.apisixRequests.Describe(ch)
c.apisixLatency.Describe(ch)
c.apisixCodes.Describe(ch)
+ c.checkClusterHealth.Describe(ch)
+ c.syncOperation.Describe(ch)
}
diff --git a/pkg/metrics/prometheus_test.go b/pkg/metrics/prometheus_test.go
index b8d4192..e62cdd9 100644
--- a/pkg/metrics/prometheus_test.go
+++ b/pkg/metrics/prometheus_test.go
@@ -30,11 +30,12 @@
assert.Equal(t, metric.Type.String(), "GAUGE")
m := metric.GetMetric()
assert.Len(t, m, 2)
+
assert.Equal(t, *m[0].Gauge.Value, float64(1))
assert.Equal(t, *m[0].Label[0].Name, "controller_namespace")
assert.Equal(t, *m[0].Label[0].Value, "default")
assert.Equal(t, *m[0].Label[1].Name, "controller_pod")
- assert.Equal(t, *m[0].Label[1].Value, "test")
+ assert.Equal(t, *m[0].Label[1].Value, "")
assert.Equal(t, *m[0].Label[2].Name, "resource")
assert.Equal(t, *m[0].Label[2].Value, "route")
assert.Equal(t, *m[0].Label[3].Name, "status_code")
@@ -44,7 +45,7 @@
assert.Equal(t, *m[1].Label[0].Name, "controller_namespace")
assert.Equal(t, *m[1].Label[0].Value, "default")
assert.Equal(t, *m[1].Label[1].Name, "controller_pod")
- assert.Equal(t, *m[1].Label[1].Value, "test")
+ assert.Equal(t, *m[1].Label[1].Value, "")
assert.Equal(t, *m[1].Label[2].Name, "resource")
assert.Equal(t, *m[1].Label[2].Value, "upstream")
assert.Equal(t, *m[1].Label[3].Name, "status_code")
@@ -64,7 +65,7 @@
assert.Equal(t, *m[0].Label[0].Name, "controller_namespace")
assert.Equal(t, *m[0].Label[0].Value, "default")
assert.Equal(t, *m[0].Label[1].Name, "controller_pod")
- assert.Equal(t, *m[0].Label[1].Value, "test")
+ assert.Equal(t, *m[0].Label[1].Value, "")
}
}
@@ -81,7 +82,7 @@
assert.Equal(t, *m[0].Label[0].Name, "controller_namespace")
assert.Equal(t, *m[0].Label[0].Value, "default")
assert.Equal(t, *m[0].Label[1].Name, "controller_pod")
- assert.Equal(t, *m[0].Label[1].Value, "test")
+ assert.Equal(t, *m[0].Label[1].Value, "")
}
}
@@ -97,7 +98,7 @@
assert.Equal(t, *m[0].Label[0].Name, "controller_namespace")
assert.Equal(t, *m[0].Label[0].Value, "default")
assert.Equal(t, *m[0].Label[1].Name, "controller_pod")
- assert.Equal(t, *m[0].Label[1].Value, "test")
+ assert.Equal(t, *m[0].Label[1].Value, "")
assert.Equal(t, *m[0].Label[2].Name, "resource")
assert.Equal(t, *m[0].Label[2].Value, "route")
@@ -105,14 +106,62 @@
assert.Equal(t, *m[1].Label[0].Name, "controller_namespace")
assert.Equal(t, *m[1].Label[0].Value, "default")
assert.Equal(t, *m[1].Label[1].Name, "controller_pod")
- assert.Equal(t, *m[1].Label[1].Value, "test")
+ assert.Equal(t, *m[1].Label[1].Value, "")
assert.Equal(t, *m[1].Label[2].Name, "resource")
assert.Equal(t, *m[1].Label[2].Value, "upstream")
}
}
+func checkClusterHealthTestHandler(t *testing.T, metrics []*io_prometheus_client.MetricFamily) func(t *testing.T) {
+ return func(t *testing.T) {
+ metric := findMetric("apisix_ingress_controller_check_cluster_health_total", metrics)
+ assert.NotNil(t, metric)
+ assert.Equal(t, metric.Type.String(), "COUNTER")
+ m := metric.GetMetric()
+ assert.Len(t, m, 1)
+
+ assert.Equal(t, *m[0].Counter.Value, float64(1))
+ assert.Equal(t, *m[0].Label[0].Name, "controller_namespace")
+ assert.Equal(t, *m[0].Label[0].Value, "default")
+ assert.Equal(t, *m[0].Label[1].Name, "controller_pod")
+ assert.Equal(t, *m[0].Label[1].Value, "")
+ assert.Equal(t, *m[0].Label[2].Name, "name")
+ assert.Equal(t, *m[0].Label[2].Value, "test")
+ }
+}
+
+func syncOperationTestHandler(t *testing.T, metrics []*io_prometheus_client.MetricFamily) func(t *testing.T) {
+ return func(t *testing.T) {
+ metric := findMetric("apisix_ingress_controller_sync_operation_total", metrics)
+ assert.NotNil(t, metric)
+ assert.Equal(t, metric.Type.String(), "COUNTER")
+ m := metric.GetMetric()
+ assert.Len(t, m, 2)
+
+ assert.Equal(t, *m[0].Counter.Value, float64(1))
+ assert.Equal(t, *m[0].Label[0].Name, "controller_namespace")
+ assert.Equal(t, *m[0].Label[0].Value, "default")
+ assert.Equal(t, *m[0].Label[1].Name, "controller_pod")
+ assert.Equal(t, *m[0].Label[1].Value, "")
+ assert.Equal(t, *m[0].Label[2].Name, "resource")
+ assert.Equal(t, *m[0].Label[2].Value, "endpoint")
+ assert.Equal(t, *m[0].Label[3].Name, "result")
+ assert.Equal(t, *m[0].Label[3].Value, "success")
+
+ assert.Equal(t, *m[1].Counter.Value, float64(1))
+ assert.Equal(t, *m[1].Label[0].Name, "controller_namespace")
+ assert.Equal(t, *m[1].Label[0].Value, "default")
+ assert.Equal(t, *m[1].Label[1].Name, "controller_pod")
+ assert.Equal(t, *m[1].Label[1].Value, "")
+ assert.Equal(t, *m[1].Label[2].Name, "resource")
+ assert.Equal(t, *m[1].Label[2].Value, "schema")
+ assert.Equal(t, *m[1].Label[3].Name, "result")
+ assert.Equal(t, *m[1].Label[3].Value, "failure")
+ }
+}
+
func TestPrometheusCollector(t *testing.T) {
- c := NewPrometheusCollector("test", "default")
+ c := NewPrometheusCollector()
c.ResetLeader(true)
c.RecordAPISIXCode(404, "route")
c.RecordAPISIXCode(500, "upstream")
@@ -120,6 +169,9 @@
c.IncrAPISIXRequest("route")
c.IncrAPISIXRequest("route")
c.IncrAPISIXRequest("upstream")
+ c.IncrCheckClusterHealth("test")
+ c.IncrSyncOperation("schema", "failure")
+ c.IncrSyncOperation("endpoint", "success")
metrics, err := prometheus.DefaultGatherer.Gather()
assert.Nil(t, err)
@@ -128,6 +180,8 @@
t.Run("is_leader", isLeaderTestHandler(t, metrics))
t.Run("apisix_request_latencies", apisixLatencyTestHandler(t, metrics))
t.Run("apisix_requests", apisixRequestTestHandler(t, metrics))
+ t.Run("check_cluster_health_total", checkClusterHealthTestHandler(t, metrics))
+ t.Run("sync_operation_total", syncOperationTestHandler(t, metrics))
}
func findMetric(name string, metrics []*io_prometheus_client.MetricFamily) *io_prometheus_client.MetricFamily {