Enhance debuggability to help identify issues in the environment (#131)
diff --git a/CHANGES.md b/CHANGES.md
index ce1713f..b622d42 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -5,6 +5,9 @@
1.2.0
------------------
#### Features
+* Introduce `pprof` module.
+* Support export multiple `telemetry` service.
+* Update the base docker image.
#### Bug Fixes
* Fix [CVE-2022-41721](https://avd.aquasec.com/nvd/cve-2022-41721).
diff --git a/configs/satellite_config.yaml b/configs/satellite_config.yaml
index 0eec3e6..938dcf8 100644
--- a/configs/satellite_config.yaml
+++ b/configs/satellite_config.yaml
@@ -32,8 +32,8 @@
service: ${SATELLITE_TELEMETRY_SERVICE:satellite-service}
# The minimum running unit, such as the pod concept in the Kubernetes.
instance: ${SATELLITE_TELEMETRY_INSTANCE:satellite-instance}
- # Telemetry export type, support "prometheus", "metrics_service" or "none"
- export_type: ${SATELLITE_TELEMETRY_EXPORT_TYPE:prometheus}
+ # Telemetry export type, support "prometheus", "metrics_service", "pprof" or "none", multiple split by ","
+ export_type: ${SATELLITE_TELEMETRY_EXPORT_TYPE:prometheus,pprof}
# Export telemetry data through Prometheus server, only works on "export_type=prometheus".
prometheus:
# The prometheus server address.
@@ -48,6 +48,10 @@
interval: ${SATELLITE_TELEMETRY_METRICS_SERVICE_INTERVAL:10}
# The prefix of telemetry metric name
metric_prefix: ${SATELLITE_TELEMETRY_METRICS_SERVICE_METRIC_PREFIX:sw_stl_}
+ # Export pprof service for detect performance issue
+ pprof:
+ # The pprof server address.
+ address: ${SATELLITE_TELEMETRY_PPROF_ADDRESS::6060}
# The sharing plugins referenced by the specific plugins in the different pipes.
sharing:
diff --git a/docker/Dockerfile b/docker/Dockerfile
index 7d638c5..683d400 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -27,7 +27,7 @@
RUN VERSION=$VERSION make linux
RUN mv /src/bin/skywalking-satellite-${VERSION}-linux-amd64 /src/bin/skywalking-satellite
-FROM scratch
+FROM debian
VOLUME /skywalking/configs
diff --git a/docs/en/setup/examples/feature/telemetry-exporter/README.md b/docs/en/setup/examples/feature/telemetry-exporter/README.md
index da82ab7..929656b 100644
--- a/docs/en/setup/examples/feature/telemetry-exporter/README.md
+++ b/docs/en/setup/examples/feature/telemetry-exporter/README.md
@@ -1,6 +1,8 @@
# Telemetry Exporter
-Satellite supports two ways to export its own telemetry data, `prometheus` or `metrics-service`.
+Satellite supports three ways to export its own telemetry data, `prometheus`, `metrics-service` or `pprof`.
+
+Multiple export methods are supported simultaneously, separated by commas.
## Prometheus
@@ -18,7 +20,7 @@
service: ${SATELLITE_TELEMETRY_SERVICE:satellite-service}
# The minimum running unit, such as the pod concept in the Kubernetes.
instance: ${SATELLITE_TELEMETRY_SERVICE:satellite-instance}
- # Telemetry export type, support "prometheus", "metrics_service" or "none"
+ # Telemetry export type, support "prometheus", "metrics_service", "pprof" or "none"
export_type: ${SATELLITE_TELEMETRY_EXPORT_TYPE:prometheus}
# Export telemetry data through Prometheus server, only works on "export_type=prometheus".
prometheus:
@@ -45,7 +47,7 @@
service: ${SATELLITE_TELEMETRY_SERVICE:satellite-service}
# The minimum running unit, such as the pod concept in the Kubernetes.
instance: ${SATELLITE_TELEMETRY_SERVICE:satellite-instance}
- # Telemetry export type, support "prometheus", "metrics_service" or "none"
+ # Telemetry export type, support "prometheus", "metrics_service", "pprof" or "none"
export_type: ${SATELLITE_TELEMETRY_EXPORT_TYPE:metrics_service}
# Export telemetry data through native meter format to OAP backend, only works on "export_type=metrics_service".
metrics_service:
@@ -55,4 +57,20 @@
interval: ${SATELLITE_TELEMETRY_METRICS_SERVICE_INTERVAL:10}
# The prefix of telemetry metric name
metric_prefix: ${SATELLITE_TELEMETRY_METRICS_SERVICE_METRIC_PREFIX:sw_stl_}
+```
+
+## pprof
+
+pprof can provide HTTP services to allow remote viewing of service execution status, helping you discover performance issues.
+
+```xml
+
+# The Satellite self telemetry configuration.
+telemetry:
+ # Telemetry export type, support "prometheus", "metrics_service", "pprof" or "none"
+ export_type: ${SATELLITE_TELEMETRY_EXPORT_TYPE:pprof}
+ # Export pprof service for detect performance issue
+ pprof:
+ # The pprof server address.
+ address: ${SATELLITE_TELEMETRY_PPROF_ADDRESS::6060}
```
\ No newline at end of file
diff --git a/internal/satellite/boot/boot.go b/internal/satellite/boot/boot.go
index d441f0f..8e18b8d 100644
--- a/internal/satellite/boot/boot.go
+++ b/internal/satellite/boot/boot.go
@@ -43,6 +43,7 @@
// import the telemetry implements
_ "github.com/apache/skywalking-satellite/internal/satellite/telemetry/metricservice"
_ "github.com/apache/skywalking-satellite/internal/satellite/telemetry/none"
+ _ "github.com/apache/skywalking-satellite/internal/satellite/telemetry/pprof"
_ "github.com/apache/skywalking-satellite/internal/satellite/telemetry/prometheus"
)
diff --git a/internal/satellite/module/sender/sender.go b/internal/satellite/module/sender/sender.go
index 6bd1941..8409c85 100644
--- a/internal/satellite/module/sender/sender.go
+++ b/internal/satellite/module/sender/sender.go
@@ -119,6 +119,8 @@
// blocking output when disconnecting.
if atomic.LoadInt32(&s.blocking) == 1 {
time.Sleep(100 * time.Millisecond)
+ log.Logger.WithField("pipe", s.config.PipeName).
+ Debugf("the client connection is disconnect, blocking the buffer")
continue
}
select {
@@ -245,6 +247,12 @@
if err := f.Forward(batchEvents); err == nil {
s.sendCounter.Add(float64(len(batchEvents)), s.config.PipeName, "success", f.ForwardType().String())
continue
+ } else {
+ log.Logger.WithFields(logrus.Fields{
+ "pipe": s.config.PipeName,
+ "offset": batch.Last(),
+ "size": len(batchEvents),
+ }).Warnf("forward event failure: %v", err)
}
if !s.runningFallbacker.FallBack(batchEvents, f.Forward) {
s.sendCounter.Add(float64(len(batchEvents)), s.config.PipeName, "failure", f.ForwardType().String())
diff --git a/internal/satellite/telemetry/pprof/server.go b/internal/satellite/telemetry/pprof/server.go
new file mode 100644
index 0000000..c80b67b
--- /dev/null
+++ b/internal/satellite/telemetry/pprof/server.go
@@ -0,0 +1,81 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pprof
+
+import (
+ "net/http"
+ "net/http/pprof"
+ "time"
+
+ "github.com/apache/skywalking-satellite/internal/pkg/log"
+ "github.com/apache/skywalking-satellite/internal/satellite/telemetry"
+ "github.com/apache/skywalking-satellite/internal/satellite/telemetry/none"
+)
+
+func init() {
+ telemetry.Register("pprof", &Server{}, false)
+}
+
+type Server struct {
+ svr *http.Server
+}
+
+func (s *Server) Start(config *telemetry.Config) error {
+ mux := http.NewServeMux()
+ mux.HandleFunc("/debug/pprof/", pprof.Index)
+ mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
+ mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
+ mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
+ mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
+ s.svr = &http.Server{
+ Addr: config.PProfService.Address,
+ ReadHeaderTimeout: 3 * time.Second,
+ Handler: mux,
+ }
+ go func() {
+ log.Logger.WithField("addr", config.PProfService.Address).Debugf("start pprof server")
+ if err := s.svr.ListenAndServe(); err != nil {
+ log.Logger.WithField("addr", config.PProfService.Address).Warnf("starting pprof server failure: %v", err)
+ }
+ }()
+ return nil
+}
+
+func (s *Server) AfterSharingStart() error {
+ return nil
+}
+
+func (s *Server) Close() error {
+ return s.svr.Close()
+}
+
+func (s *Server) NewCounter(name, help string, labels ...string) telemetry.Counter {
+ return &none.Counter{}
+}
+
+func (s *Server) NewGauge(name, help string, getter func() float64, labels ...string) telemetry.Gauge {
+ return &none.Gauge{}
+}
+
+func (s *Server) NewDynamicGauge(name, help string, labels ...string) telemetry.DynamicGauge {
+ return &none.DynamicGauge{}
+}
+
+func (s *Server) NewTimer(name, help string, labels ...string) telemetry.Timer {
+ return &none.Timer{}
+}
diff --git a/internal/satellite/telemetry/telemetry.go b/internal/satellite/telemetry/telemetry.go
index 9b60183..27a7cf2 100644
--- a/internal/satellite/telemetry/telemetry.go
+++ b/internal/satellite/telemetry/telemetry.go
@@ -19,6 +19,10 @@
import (
"fmt"
+ "strings"
+ "time"
+
+ "github.com/hashicorp/go-multierror"
)
var (
@@ -33,12 +37,14 @@
Service string `mapstructure:"service"` // The service name.
Instance string `mapstructure:"instance"` // The instance name.
- // Telemetry export type, support "prometheus", "metrics_service" or "none"
+ // Telemetry export type, support "prometheus", "metrics_service" or "none", multiple split by ","
ExportType string `mapstructure:"export_type"`
// Export telemetry data through Prometheus server, only works on "export_type=prometheus".
Prometheus PrometheusConfig `mapstructure:"prometheus"`
// Export telemetry data through native meter format to OAP backend, only works on "export_type=metrics_service".
MetricsService MetricsServiceConfig `mapstructure:"metrics_service"`
+ // Export pprof service for detect performance issue
+ PProfService PProfServiceConfig `mapstructure:"pprof"`
}
type PrometheusConfig struct {
@@ -52,6 +58,10 @@
MetricPrefix string `mapstructure:"metric_prefix"` // The prefix of telemetry metric name
}
+type PProfServiceConfig struct {
+ Address string `mapstructure:"address"` // The pprof server address.
+}
+
type Server interface {
Start(config *Config) error
AfterSharingStart() error
@@ -72,12 +82,25 @@
// Init create the global telemetry center according to the config.
func Init(c *Config) error {
- currentServer = servers[c.ExportType]
- if currentServer == nil {
- return fmt.Errorf("could not found telemetry exporter: %s", c.ExportType)
+ types := strings.Split(c.ExportType, ",")
+ exportServers := make([]Server, 0)
+ for _, t := range types {
+ server := servers[t]
+ if server == nil {
+ return fmt.Errorf("could not found telemetry exporter: %s", t)
+ }
+ if e := server.Start(c); e != nil {
+ return e
+ }
+ exportServers = append(exportServers, server)
}
- return currentServer.Start(c)
+ if len(exportServers) > 1 {
+ currentServer = &MultipleServer{Servers: exportServers}
+ } else if len(exportServers) == 1 {
+ currentServer = exportServers[0]
+ }
+ return nil
}
func AfterShardingStart() error {
@@ -110,3 +133,133 @@
}
return currentServer
}
+
+type MultipleServer struct {
+ Servers []Server
+}
+
+func (s *MultipleServer) Start(config *Config) error {
+ var err error
+ for _, ss := range s.Servers {
+ if e := ss.Start(config); e != nil {
+ err = multierror.Append(err, e)
+ }
+ }
+ return err
+}
+
+func (s *MultipleServer) AfterSharingStart() error {
+ var err error
+ for _, ss := range s.Servers {
+ if e := ss.AfterSharingStart(); e != nil {
+ err = multierror.Append(err, e)
+ }
+ }
+ return err
+}
+
+func (s *MultipleServer) Close() error {
+ var err error
+ for _, ss := range s.Servers {
+ if e := ss.Close(); e != nil {
+ err = multierror.Append(err, e)
+ }
+ }
+ return err
+}
+
+func (s *MultipleServer) NewCounter(name, help string, labels ...string) Counter {
+ result := make([]Counter, 0)
+ for _, ss := range s.Servers {
+ result = append(result, ss.NewCounter(name, help, labels...))
+ }
+ return &MultipleCounter{Counters: result}
+}
+
+func (s *MultipleServer) NewGauge(name, help string, getter func() float64, labels ...string) Gauge {
+ result := make([]Gauge, 0)
+ for _, ss := range s.Servers {
+ result = append(result, ss.NewGauge(name, help, getter, labels...))
+ }
+ return &MultipleGauge{Gauges: result}
+}
+
+func (s *MultipleServer) NewDynamicGauge(name, help string, labels ...string) DynamicGauge {
+ result := make([]DynamicGauge, 0)
+ for _, ss := range s.Servers {
+ result = append(result, ss.NewDynamicGauge(name, help, labels...))
+ }
+ return &MultipleDynamicGauge{Gauges: result}
+}
+
+func (s *MultipleServer) NewTimer(name, help string, labels ...string) Timer {
+ result := make([]Timer, 0)
+ for _, ss := range s.Servers {
+ result = append(result, ss.NewTimer(name, help, labels...))
+ }
+ return &MultipleTimer{Timers: result}
+}
+
+type MultipleCounter struct {
+ Counters []Counter
+}
+
+func (m *MultipleCounter) Inc(labelValues ...string) {
+ for _, c := range m.Counters {
+ c.Inc(labelValues...)
+ }
+}
+
+func (m *MultipleCounter) Add(val float64, labelValues ...string) {
+ for _, c := range m.Counters {
+ c.Add(val, labelValues...)
+ }
+}
+
+type MultipleGauge struct {
+ Gauges []Gauge
+}
+
+type MultipleDynamicGauge struct {
+ Gauges []DynamicGauge
+}
+
+func (g *MultipleDynamicGauge) Inc(labelValues ...string) {
+ for _, ga := range g.Gauges {
+ ga.Inc(labelValues...)
+ }
+}
+
+func (g *MultipleDynamicGauge) Dec(labelValues ...string) {
+ for _, ga := range g.Gauges {
+ ga.Dec(labelValues...)
+ }
+}
+
+type MultipleTimer struct {
+ Timers []Timer
+}
+
+func (m *MultipleTimer) Start(labelValues ...string) TimeRecorder {
+ recorders := make([]TimeRecorder, 0)
+ for _, t := range m.Timers {
+ recorders = append(recorders, t.Start(labelValues...))
+ }
+ return &MultipleTimeRecorder{Recorders: recorders}
+}
+
+func (m *MultipleTimer) AddTime(t time.Duration, labelValues ...string) {
+ for _, ti := range m.Timers {
+ ti.AddTime(t, labelValues...)
+ }
+}
+
+type MultipleTimeRecorder struct {
+ Recorders []TimeRecorder
+}
+
+func (m *MultipleTimeRecorder) Stop() {
+ for _, r := range m.Recorders {
+ r.Stop()
+ }
+}
diff --git a/plugins/client/grpc/lb/load_balancer.go b/plugins/client/grpc/lb/load_balancer.go
index b1a62d0..ea6fe33 100644
--- a/plugins/client/grpc/lb/load_balancer.go
+++ b/plugins/client/grpc/lb/load_balancer.go
@@ -21,15 +21,14 @@
"hash/crc32"
"sync"
+ "github.com/apache/skywalking-satellite/internal/pkg/log"
+
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
- "google.golang.org/grpc/grpclog"
)
const Name = "satellite_lb"
-var logger = grpclog.Component(Name)
-
func newBuilder() balancer.Builder {
return base.NewBalancerBuilder(Name, &satelliteDynamicPickerBuilder{}, base.Config{HealthCheck: true})
}
@@ -42,7 +41,6 @@
}
func (s *satelliteDynamicPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
- logger.Infof("ready to build a new picker: %v", info)
if len(info.ReadySCs) == 0 {
return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
}
@@ -81,6 +79,7 @@
func (s *satelliteDynamicPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
// only one connection
if s.connCount == 1 {
+ log.Logger.Debugf("pick the connection: %s", s.cons[0].addr)
return balancer.PickResult{SubConn: s.cons[0].conn}, nil
}
@@ -93,6 +92,7 @@
// check exists appoint address
if config.appointAddr != "" {
if con := s.addrToConn[config.appointAddr]; con != nil {
+ log.Logger.Debugf("use the appoint connection: %s", config.appointAddr)
return balancer.PickResult{SubConn: con}, nil
}
}
@@ -102,6 +102,7 @@
connWrap := s.cons[routeIndex]
// update the address to the config
config.appointAddr = connWrap.addr
+ log.Logger.Debugf("pick the connection: %s", connWrap.addr)
return balancer.PickResult{SubConn: connWrap.conn}, nil
}
@@ -110,6 +111,7 @@
sc := s.cons[s.next]
s.next = (s.next + 1) % s.connCount
s.mu.Unlock()
+ log.Logger.Debugf("pick the connection: %s", sc.addr)
return balancer.PickResult{SubConn: sc.conn}
}