add metric: check_cluster_health and sync_operation_total (#627)

diff --git a/pkg/apisix/cluster.go b/pkg/apisix/cluster.go
index 34e4492..ef4ef98 100644
--- a/pkg/apisix/cluster.go
+++ b/pkg/apisix/cluster.go
@@ -35,6 +35,7 @@
 
 	"github.com/apache/apisix-ingress-controller/pkg/apisix/cache"
 	"github.com/apache/apisix-ingress-controller/pkg/log"
+	"github.com/apache/apisix-ingress-controller/pkg/metrics"
 	"github.com/apache/apisix-ingress-controller/pkg/types"
 )
 
@@ -80,23 +81,24 @@
 }
 
 type cluster struct {
-	name         string
-	baseURL      string
-	baseURLHost  string
-	adminKey     string
-	cli          *http.Client
-	cacheState   int32
-	cache        cache.Cache
-	cacheSynced  chan struct{}
-	cacheSyncErr error
-	route        Route
-	upstream     Upstream
-	ssl          SSL
-	streamRoute  StreamRoute
-	globalRules  GlobalRule
-	consumer     Consumer
-	plugin       Plugin
-	schema       Schema
+	name             string
+	baseURL          string
+	baseURLHost      string
+	adminKey         string
+	cli              *http.Client
+	cacheState       int32
+	cache            cache.Cache
+	cacheSynced      chan struct{}
+	cacheSyncErr     error
+	route            Route
+	upstream         Upstream
+	ssl              SSL
+	streamRoute      StreamRoute
+	globalRules      GlobalRule
+	consumer         Consumer
+	plugin           Plugin
+	schema           Schema
+	metricsCollector metrics.Collector
 }
 
 func newCluster(ctx context.Context, o *ClusterOptions) (Cluster, error) {
@@ -125,8 +127,9 @@
 			Timeout:   o.Timeout,
 			Transport: _defaultTransport,
 		},
-		cacheState:  _cacheSyncing, // default state
-		cacheSynced: make(chan struct{}),
+		cacheState:       _cacheSyncing, // default state
+		cacheSynced:      make(chan struct{}),
+		metricsCollector: metrics.NewPrometheusCollector(),
 	}
 	c.route = newRouteClient(c)
 	c.upstream = newUpstreamClient(c)
@@ -322,6 +325,7 @@
 	for {
 		if err := c.syncSchemaOnce(ctx); err != nil {
 			log.Warnf("failed to sync schema: %s", err)
+			c.metricsCollector.IncrSyncOperation("schema", "failure")
 		}
 
 		select {
@@ -379,6 +383,7 @@
 			continue
 		}
 	}
+	c.metricsCollector.IncrSyncOperation("schema", "success")
 	return nil
 }
 
diff --git a/pkg/ingress/apisix_consumer.go b/pkg/ingress/apisix_consumer.go
index f54aad4..0cf28ba 100644
--- a/pkg/ingress/apisix_consumer.go
+++ b/pkg/ingress/apisix_consumer.go
@@ -131,9 +131,11 @@
 		)
 		c.controller.recorderEvent(ac, corev1.EventTypeWarning, _resourceSyncAborted, err)
 		c.controller.recordStatus(ac, _resourceSyncAborted, err, metav1.ConditionFalse)
+		c.controller.metricsCollector.IncrSyncOperation("consumer", "failure")
 		return err
 	}
 
+	c.controller.metricsCollector.IncrSyncOperation("consumer", "success")
 	c.controller.recorderEvent(ac, corev1.EventTypeNormal, _resourceSynced, nil)
 	return nil
 }
diff --git a/pkg/ingress/apisix_tls.go b/pkg/ingress/apisix_tls.go
index 2ba036d..79c9478 100644
--- a/pkg/ingress/apisix_tls.go
+++ b/pkg/ingress/apisix_tls.go
@@ -146,7 +146,6 @@
 		c.controller.recordStatus(tls, _resourceSyncAborted, err, metav1.ConditionFalse)
 		return err
 	}
-
 	c.controller.recorderEvent(tls, corev1.EventTypeNormal, _resourceSynced, nil)
 	c.controller.recordStatus(tls, _resourceSynced, nil, metav1.ConditionTrue)
 	return err
@@ -173,6 +172,7 @@
 func (c *apisixTlsController) handleSyncErr(obj interface{}, err error) {
 	if err == nil {
 		c.workqueue.Forget(obj)
+		c.controller.metricsCollector.IncrSyncOperation("ssl", "success")
 		return
 	}
 	log.Warnw("sync ApisixTls failed, will retry",
@@ -180,6 +180,7 @@
 		zap.Error(err),
 	)
 	c.workqueue.AddRateLimited(obj)
+	c.controller.metricsCollector.IncrSyncOperation("ssl", "failure")
 }
 
 func (c *apisixTlsController) onAdd(obj interface{}) {
diff --git a/pkg/ingress/controller.go b/pkg/ingress/controller.go
index 4d59b52..2af856d 100644
--- a/pkg/ingress/controller.go
+++ b/pkg/ingress/controller.go
@@ -166,7 +166,7 @@
 		cfg:               cfg,
 		apiServer:         apiSrv,
 		apisix:            client,
-		metricsCollector:  metrics.NewPrometheusCollector(podName, podNamespace),
+		metricsCollector:  metrics.NewPrometheusCollector(),
 		kubeClient:        kubeClient,
 		watchingNamespace: watchingNamespace,
 		secretSSLMap:      new(sync.Map),
@@ -617,5 +617,6 @@
 			return
 		}
 		log.Debugf("success check health for default cluster")
+		c.metricsCollector.IncrCheckClusterHealth(c.name)
 	}
 }
diff --git a/pkg/ingress/endpoint.go b/pkg/ingress/endpoint.go
index c2eb236..53889f8 100644
--- a/pkg/ingress/endpoint.go
+++ b/pkg/ingress/endpoint.go
@@ -90,12 +90,14 @@
 func (c *endpointsController) handleSyncErr(obj interface{}, err error) {
 	if err == nil {
 		c.workqueue.Forget(obj)
+		c.controller.metricsCollector.IncrSyncOperation("endpoint", "success")
 		return
 	}
 	log.Warnw("sync endpoints failed, will retry",
 		zap.Any("object", obj),
 	)
 	c.workqueue.AddRateLimited(obj)
+	c.controller.metricsCollector.IncrSyncOperation("endpoint", "failure")
 }
 
 func (c *endpointsController) onAdd(obj interface{}) {
diff --git a/pkg/ingress/endpointslice.go b/pkg/ingress/endpointslice.go
index c8eaa05..691e4ed 100644
--- a/pkg/ingress/endpointslice.go
+++ b/pkg/ingress/endpointslice.go
@@ -109,12 +109,14 @@
 func (c *endpointSliceController) handleSyncErr(obj interface{}, err error) {
 	if err == nil {
 		c.workqueue.Forget(obj)
+		c.controller.metricsCollector.IncrSyncOperation("endpointSlices", "success")
 		return
 	}
 	log.Warnw("sync endpointSlice failed, will retry",
 		zap.Any("object", obj),
 	)
 	c.workqueue.AddRateLimited(obj)
+	c.controller.metricsCollector.IncrSyncOperation("endpointSlices", "failure")
 }
 
 func (c *endpointSliceController) onAdd(obj interface{}) {
diff --git a/pkg/ingress/secret.go b/pkg/ingress/secret.go
index 2d8e763..700bdd7 100644
--- a/pkg/ingress/secret.go
+++ b/pkg/ingress/secret.go
@@ -215,6 +215,7 @@
 func (c *secretController) handleSyncErr(obj interface{}, err error) {
 	if err == nil {
 		c.workqueue.Forget(obj)
+		c.controller.metricsCollector.IncrSyncOperation("secret", "success")
 		return
 	}
 	log.Warnw("sync ApisixTls failed, will retry",
@@ -222,6 +223,7 @@
 		zap.Error(err),
 	)
 	c.workqueue.AddRateLimited(obj)
+	c.controller.metricsCollector.IncrSyncOperation("secret", "failure")
 }
 
 func (c *secretController) onAdd(obj interface{}) {
diff --git a/pkg/metrics/prometheus.go b/pkg/metrics/prometheus.go
index 5f3fbfc..56aa1f1 100644
--- a/pkg/metrics/prometheus.go
+++ b/pkg/metrics/prometheus.go
@@ -15,6 +15,7 @@
 package metrics
 
 import (
+	"os"
 	"strconv"
 	"time"
 
@@ -37,20 +38,33 @@
 	RecordAPISIXLatency(time.Duration)
 	// IncrAPISIXRequest increases the number of requests to apisix.
 	IncrAPISIXRequest(string)
+	// IncrCheckClusterHealth increases the number of cluster health check operations
+	// with the cluster name label.
+	IncrCheckClusterHealth(string)
+	// IncrSyncOperation increases the number of sync operations with the resource
+	// type label.
+	IncrSyncOperation(string, string)
 }
 
 // collector contains necessary messages to collect Prometheus metrics.
 type collector struct {
-	isLeader       prometheus.Gauge
-	apisixLatency  prometheus.Summary
-	apisixRequests *prometheus.CounterVec
-	apisixCodes    *prometheus.GaugeVec
+	isLeader           prometheus.Gauge
+	apisixLatency      prometheus.Summary
+	apisixRequests     *prometheus.CounterVec
+	apisixCodes        *prometheus.GaugeVec
+	checkClusterHealth *prometheus.CounterVec
+	syncOperation      *prometheus.CounterVec
 }
 
 // NewPrometheusCollectors creates the Prometheus metrics collector.
 // It also registers all internal metric collector to prometheus,
 // so do not call this function duplicately.
-func NewPrometheusCollector(podName, podNamespace string) Collector {
+func NewPrometheusCollector() Collector {
+	podName := os.Getenv("POD_NAME")
+	podNamespace := os.Getenv("POD_NAMESPACE")
+	if podNamespace == "" {
+		podNamespace = "default"
+	}
 	constLabels := prometheus.Labels{
 		"controller_pod":       podName,
 		"controller_namespace": podNamespace,
@@ -91,6 +105,24 @@
 			},
 			[]string{"resource"},
 		),
+		checkClusterHealth: prometheus.NewCounterVec(
+			prometheus.CounterOpts{
+				Namespace:   _namespace,
+				Name:        "check_cluster_health_total",
+				Help:        "Number of cluster health check operations",
+				ConstLabels: constLabels,
+			},
+			[]string{"name"},
+		),
+		syncOperation: prometheus.NewCounterVec(
+			prometheus.CounterOpts{
+				Namespace:   _namespace,
+				Name:        "sync_operation_total",
+				Help:        "Number of sync operations",
+				ConstLabels: constLabels,
+			},
+			[]string{"resource", "result"},
+		),
 	}
 
 	// Since we use the DefaultRegisterer, in test cases, the metrics
@@ -99,12 +131,16 @@
 	prometheus.Unregister(collector.apisixCodes)
 	prometheus.Unregister(collector.apisixLatency)
 	prometheus.Unregister(collector.apisixRequests)
+	prometheus.Unregister(collector.checkClusterHealth)
+	prometheus.Unregister(collector.syncOperation)
 
 	prometheus.MustRegister(
 		collector.isLeader,
 		collector.apisixCodes,
 		collector.apisixLatency,
 		collector.apisixRequests,
+		collector.checkClusterHealth,
+		collector.syncOperation,
 	)
 
 	return collector
@@ -140,6 +176,21 @@
 	c.apisixRequests.WithLabelValues(resource).Inc()
 }
 
+// IncrCheckClusterHealth increases the number of cluster health check
+// operations.
+func (c *collector) IncrCheckClusterHealth(name string) {
+	c.checkClusterHealth.WithLabelValues(name).Inc()
+}
+
+// IncrSyncOperation increases the number of sync operations for specific
+// resource.
+func (c *collector) IncrSyncOperation(resource, result string) {
+	c.syncOperation.With(prometheus.Labels{
+		"resource": resource,
+		"result":   result,
+	}).Inc()
+}
+
 // Collect collects the prometheus.Collect.
 func (c *collector) Collect(ch chan<- prometheus.Metric) {
 	c.isLeader.Collect(ch)
@@ -147,6 +198,8 @@
 	c.apisixRequests.Collect(ch)
 	c.apisixLatency.Collect(ch)
 	c.apisixCodes.Collect(ch)
+	c.checkClusterHealth.Collect(ch)
+	c.syncOperation.Collect(ch)
 }
 
 // Describe describes the prometheus.Describe.
@@ -156,4 +209,6 @@
 	c.apisixRequests.Describe(ch)
 	c.apisixLatency.Describe(ch)
 	c.apisixCodes.Describe(ch)
+	c.checkClusterHealth.Describe(ch)
+	c.syncOperation.Describe(ch)
 }
diff --git a/pkg/metrics/prometheus_test.go b/pkg/metrics/prometheus_test.go
index b8d4192..e62cdd9 100644
--- a/pkg/metrics/prometheus_test.go
+++ b/pkg/metrics/prometheus_test.go
@@ -30,11 +30,12 @@
 		assert.Equal(t, metric.Type.String(), "GAUGE")
 		m := metric.GetMetric()
 		assert.Len(t, m, 2)
+
 		assert.Equal(t, *m[0].Gauge.Value, float64(1))
 		assert.Equal(t, *m[0].Label[0].Name, "controller_namespace")
 		assert.Equal(t, *m[0].Label[0].Value, "default")
 		assert.Equal(t, *m[0].Label[1].Name, "controller_pod")
-		assert.Equal(t, *m[0].Label[1].Value, "test")
+		assert.Equal(t, *m[0].Label[1].Value, "")
 		assert.Equal(t, *m[0].Label[2].Name, "resource")
 		assert.Equal(t, *m[0].Label[2].Value, "route")
 		assert.Equal(t, *m[0].Label[3].Name, "status_code")
@@ -44,7 +45,7 @@
 		assert.Equal(t, *m[1].Label[0].Name, "controller_namespace")
 		assert.Equal(t, *m[1].Label[0].Value, "default")
 		assert.Equal(t, *m[1].Label[1].Name, "controller_pod")
-		assert.Equal(t, *m[1].Label[1].Value, "test")
+		assert.Equal(t, *m[1].Label[1].Value, "")
 		assert.Equal(t, *m[1].Label[2].Name, "resource")
 		assert.Equal(t, *m[1].Label[2].Value, "upstream")
 		assert.Equal(t, *m[1].Label[3].Name, "status_code")
@@ -64,7 +65,7 @@
 		assert.Equal(t, *m[0].Label[0].Name, "controller_namespace")
 		assert.Equal(t, *m[0].Label[0].Value, "default")
 		assert.Equal(t, *m[0].Label[1].Name, "controller_pod")
-		assert.Equal(t, *m[0].Label[1].Value, "test")
+		assert.Equal(t, *m[0].Label[1].Value, "")
 	}
 }
 
@@ -81,7 +82,7 @@
 		assert.Equal(t, *m[0].Label[0].Name, "controller_namespace")
 		assert.Equal(t, *m[0].Label[0].Value, "default")
 		assert.Equal(t, *m[0].Label[1].Name, "controller_pod")
-		assert.Equal(t, *m[0].Label[1].Value, "test")
+		assert.Equal(t, *m[0].Label[1].Value, "")
 	}
 }
 
@@ -97,7 +98,7 @@
 		assert.Equal(t, *m[0].Label[0].Name, "controller_namespace")
 		assert.Equal(t, *m[0].Label[0].Value, "default")
 		assert.Equal(t, *m[0].Label[1].Name, "controller_pod")
-		assert.Equal(t, *m[0].Label[1].Value, "test")
+		assert.Equal(t, *m[0].Label[1].Value, "")
 		assert.Equal(t, *m[0].Label[2].Name, "resource")
 		assert.Equal(t, *m[0].Label[2].Value, "route")
 
@@ -105,14 +106,62 @@
 		assert.Equal(t, *m[1].Label[0].Name, "controller_namespace")
 		assert.Equal(t, *m[1].Label[0].Value, "default")
 		assert.Equal(t, *m[1].Label[1].Name, "controller_pod")
-		assert.Equal(t, *m[1].Label[1].Value, "test")
+		assert.Equal(t, *m[1].Label[1].Value, "")
 		assert.Equal(t, *m[1].Label[2].Name, "resource")
 		assert.Equal(t, *m[1].Label[2].Value, "upstream")
 	}
 }
 
+func checkClusterHealthTestHandler(t *testing.T, metrics []*io_prometheus_client.MetricFamily) func(t *testing.T) {
+	return func(t *testing.T) {
+		metric := findMetric("apisix_ingress_controller_check_cluster_health_total", metrics)
+		assert.NotNil(t, metric)
+		assert.Equal(t, metric.Type.String(), "COUNTER")
+		m := metric.GetMetric()
+		assert.Len(t, m, 1)
+
+		assert.Equal(t, *m[0].Counter.Value, float64(1))
+		assert.Equal(t, *m[0].Label[0].Name, "controller_namespace")
+		assert.Equal(t, *m[0].Label[0].Value, "default")
+		assert.Equal(t, *m[0].Label[1].Name, "controller_pod")
+		assert.Equal(t, *m[0].Label[1].Value, "")
+		assert.Equal(t, *m[0].Label[2].Name, "name")
+		assert.Equal(t, *m[0].Label[2].Value, "test")
+	}
+}
+
+func syncOperationTestHandler(t *testing.T, metrics []*io_prometheus_client.MetricFamily) func(t *testing.T) {
+	return func(t *testing.T) {
+		metric := findMetric("apisix_ingress_controller_sync_operation_total", metrics)
+		assert.NotNil(t, metric)
+		assert.Equal(t, metric.Type.String(), "COUNTER")
+		m := metric.GetMetric()
+		assert.Len(t, m, 2)
+
+		assert.Equal(t, *m[0].Counter.Value, float64(1))
+		assert.Equal(t, *m[0].Label[0].Name, "controller_namespace")
+		assert.Equal(t, *m[0].Label[0].Value, "default")
+		assert.Equal(t, *m[0].Label[1].Name, "controller_pod")
+		assert.Equal(t, *m[0].Label[1].Value, "")
+		assert.Equal(t, *m[0].Label[2].Name, "resource")
+		assert.Equal(t, *m[0].Label[2].Value, "endpoint")
+		assert.Equal(t, *m[0].Label[3].Name, "result")
+		assert.Equal(t, *m[0].Label[3].Value, "success")
+
+		assert.Equal(t, *m[1].Counter.Value, float64(1))
+		assert.Equal(t, *m[1].Label[0].Name, "controller_namespace")
+		assert.Equal(t, *m[1].Label[0].Value, "default")
+		assert.Equal(t, *m[1].Label[1].Name, "controller_pod")
+		assert.Equal(t, *m[1].Label[1].Value, "")
+		assert.Equal(t, *m[1].Label[2].Name, "resource")
+		assert.Equal(t, *m[1].Label[2].Value, "schema")
+		assert.Equal(t, *m[1].Label[3].Name, "result")
+		assert.Equal(t, *m[1].Label[3].Value, "failure")
+	}
+}
+
 func TestPrometheusCollector(t *testing.T) {
-	c := NewPrometheusCollector("test", "default")
+	c := NewPrometheusCollector()
 	c.ResetLeader(true)
 	c.RecordAPISIXCode(404, "route")
 	c.RecordAPISIXCode(500, "upstream")
@@ -120,6 +169,9 @@
 	c.IncrAPISIXRequest("route")
 	c.IncrAPISIXRequest("route")
 	c.IncrAPISIXRequest("upstream")
+	c.IncrCheckClusterHealth("test")
+	c.IncrSyncOperation("schema", "failure")
+	c.IncrSyncOperation("endpoint", "success")
 
 	metrics, err := prometheus.DefaultGatherer.Gather()
 	assert.Nil(t, err)
@@ -128,6 +180,8 @@
 	t.Run("is_leader", isLeaderTestHandler(t, metrics))
 	t.Run("apisix_request_latencies", apisixLatencyTestHandler(t, metrics))
 	t.Run("apisix_requests", apisixRequestTestHandler(t, metrics))
+	t.Run("check_cluster_health_total", checkClusterHealthTestHandler(t, metrics))
+	t.Run("sync_operation_total", syncOperationTestHandler(t, metrics))
 }
 
 func findMetric(name string, metrics []*io_prometheus_client.MetricFamily) *io_prometheus_client.MetricFamily {