Add timeout config for the gRPC client (#132)
diff --git a/CHANGES.md b/CHANGES.md
index b622d42..7d0cf87 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -8,6 +8,8 @@
* Introduce `pprof` module.
* Support export multiple `telemetry` service.
* Update the base docker image.
+* Add timeout configuration for gRPC client.
+* Reduce log print when the enqueue data to the pipeline error.
#### Bug Fixes
* Fix [CVE-2022-41721](https://avd.aquasec.com/nvd/cve-2022-41721).
diff --git a/configs/satellite_config.yaml b/configs/satellite_config.yaml
index 938dcf8..fa9e559 100644
--- a/configs/satellite_config.yaml
+++ b/configs/satellite_config.yaml
@@ -120,6 +120,12 @@
check_period: ${SATELLITE_GRPC_CHECK_PERIOD:5}
# The auth value when send request
authentication: ${SATELLITE_GRPC_AUTHENTICATION:""}
+ # The gRPC send request timeout
+ timeout:
+ # The timeout for unary single request
+ unary: ${SATELLITE_GRPC_TIMEOUT_UNARY:5s}
+ # The timeout for unary stream request
+ stream: ${SATELLITE_GRPC_TIMEOUT_STREAM:20s}
servers:
- plugin_name: "grpc-server"
# The address of grpc server.
diff --git a/docs/en/setup/plugins/client_grpc-client.md b/docs/en/setup/plugins/client_grpc-client.md
index e56fed9..5ece8d9 100755
--- a/docs/en/setup/plugins/client_grpc-client.md
+++ b/docs/en/setup/plugins/client_grpc-client.md
@@ -41,6 +41,13 @@
# How frequently to check the connection(second)
check_period: 5
+
+# The gRPC send request timeout
+timeout:
+ # The timeout for unary single request
+ unary: 5s
+ # The timeout for unary stream request
+ stream: 20s
```
## Configuration
|Name|Type|Description|
@@ -68,4 +75,5 @@
| insecure_skip_verify | bool | Controls whether a client verifies the server's certificate chain and host name. |
| authentication | string | The auth value when send request |
| check_period | int | How frequently to check the connection(second) |
+| timeout | grpc.TimeoutConfig | The gRPC send request timeout |
diff --git a/internal/satellite/config/loader_test.go b/internal/satellite/config/loader_test.go
index 1dcddcc..42669bf 100644
--- a/internal/satellite/config/loader_test.go
+++ b/internal/satellite/config/loader_test.go
@@ -98,6 +98,10 @@
"check_period": 5,
"authentication": "",
"finder_type": "static",
+ "timeout": map[string]interface{}{
+ "unary": "5s",
+ "stream": "20s",
+ },
"kubernetes_config": map[string]interface{}{
"api_server": "",
"basic_auth": map[string]interface{}{
diff --git a/internal/satellite/module/gatherer/receiver_gatherer.go b/internal/satellite/module/gatherer/receiver_gatherer.go
index 9185e97..caf4657 100644
--- a/internal/satellite/module/gatherer/receiver_gatherer.go
+++ b/internal/satellite/module/gatherer/receiver_gatherer.go
@@ -39,6 +39,8 @@
v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
)
+var enqueueErrorPrintInterval = 20 * time.Second
+
type ReceiverGatherer struct {
// config
config *api.GathererConfig
@@ -56,6 +58,9 @@
// sync invoker
processor processor.Processor
+
+ lastEnqueueErrorPrintTime time.Time
+ enqueueErrorCounter map[string]int
}
func (r *ReceiverGatherer) Prepare() error {
@@ -71,6 +76,8 @@
}
r.receiveCounter = telemetry.NewCounter("gatherer_receive_count", "Total number of the receiving count in the Gatherer.", "pipe", "status")
r.queueOutputCounter = telemetry.NewCounter("queue_output_count", "Total number of the output count in the Queue of Gatherer.", "pipe", "status")
+ r.lastEnqueueErrorPrintTime = time.Now()
+ r.enqueueErrorCounter = make(map[string]int)
return nil
}
@@ -88,11 +95,7 @@
r.receiveCounter.Inc(r.config.PipeName, "all")
err := r.runningQueue.Enqueue(e)
if err != nil {
- r.receiveCounter.Inc(r.config.PipeName, "abandoned")
- log.Logger.WithFields(logrus.Fields{
- "pipe": r.config.PipeName,
- "queue": r.runningQueue.Name(),
- }).Errorf("error in enqueue: %v", err)
+ r.recordEnqueueError(err)
}
case <-childCtx.Done():
cancel()
@@ -107,6 +110,23 @@
wg.Wait()
}
+func (r *ReceiverGatherer) recordEnqueueError(err error) {
+ r.receiveCounter.Inc(r.config.PipeName, "abandoned")
+ r.enqueueErrorCounter[err.Error()]++
+ if time.Now().After(r.lastEnqueueErrorPrintTime.Add(enqueueErrorPrintInterval)) {
+ for e, count := range r.enqueueErrorCounter {
+ log.Logger.WithFields(logrus.Fields{
+ "pipe": r.config.PipeName,
+ "queue": r.runningQueue.Name(),
+ "count": count,
+ }).Errorf("error in enqueue: %v", e)
+ }
+
+ r.lastEnqueueErrorPrintTime = time.Now()
+ r.enqueueErrorCounter = make(map[string]int)
+ }
+}
+
func (r *ReceiverGatherer) consumeQueue(ctx context.Context, p int, wg *sync.WaitGroup) {
go func() {
childCtx, cancel := context.WithCancel(ctx)
diff --git a/plugins/client/grpc/client.go b/plugins/client/grpc/client.go
index d5c9ea8..ad326f9 100644
--- a/plugins/client/grpc/client.go
+++ b/plugins/client/grpc/client.go
@@ -49,6 +49,8 @@
Authentication string `mapstructure:"authentication"` // The auth value when send request
CheckPeriod int `mapstructure:"check_period"` // How frequently to check the connection(second)
+ Timeout TimeoutConfig `mapstructure:"timeout"` // The gRPC send request timeout
+
// components
status api.ClientStatus
client *grpc.ClientConn
@@ -57,6 +59,11 @@
cancel context.CancelFunc // Parent ctx cancel function
}
+type TimeoutConfig struct {
+ Unary string `mapstructure:"unary"` // The timeout for unary single request
+ Stream string `mapstructure:"stream"` // The timeout for unary stream request
+}
+
func (c *Client) Name() string {
return Name
}
@@ -109,6 +116,13 @@
# How frequently to check the connection(second)
check_period: 5
+
+# The gRPC send request timeout
+timeout:
+ # The timeout for unary single request
+ unary: 5s
+ # The timeout for unary stream request
+ stream: 20s
`
}
diff --git a/plugins/client/grpc/client_config.go b/plugins/client/grpc/client_config.go
index b3f8e20..67641db 100644
--- a/plugins/client/grpc/client_config.go
+++ b/plugins/client/grpc/client_config.go
@@ -23,6 +23,7 @@
"crypto/x509"
"fmt"
"os"
+ "time"
"github.com/apache/skywalking-satellite/plugins/client/grpc/lb"
@@ -52,24 +53,39 @@
authHeader = metadata.New(map[string]string{"Authentication": c.Authentication})
}
+ unaryRequestTimeout, err := time.ParseDuration(c.Timeout.Unary)
+ if err != nil {
+ return nil, fmt.Errorf("cannot parse the unary request timeout: %v", err)
+ }
+ streamRequestTimeout, err := time.ParseDuration(c.Timeout.Stream)
+ if err != nil {
+ return nil, fmt.Errorf("cannot parse the stream request timeout: %v", err)
+ }
+
// append auth or report error
options = append(options, grpc.WithStreamInterceptor(func(ctx context.Context, desc *grpc.StreamDesc,
cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
if authHeader != nil {
ctx = metadata.NewOutgoingContext(ctx, authHeader)
}
- clientStream, err := streamer(ctx, desc, cc, method, opts...)
+ timeout, timeoutFunc := context.WithTimeout(ctx, streamRequestTimeout)
+ clientStream, err := streamer(timeout, desc, cc, method, opts...)
if err != nil {
+ timeoutFunc()
c.reportError(err)
+ return nil, err
}
- return clientStream, err
+ streamWrapper := &timeoutClientStream{clientStream, timeoutFunc}
+ return streamWrapper, err
}))
grpc.WithUnaryInterceptor(func(ctx context.Context, method string, req, reply interface{},
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
if authHeader != nil {
ctx = metadata.NewOutgoingContext(ctx, authHeader)
}
- err := invoker(ctx, method, req, reply, cc, opts...)
+ timeout, timeoutFunc := context.WithTimeout(ctx, unaryRequestTimeout)
+ defer timeoutFunc()
+ err := invoker(timeout, method, req, reply, cc, opts...)
if err != nil {
c.reportError(err)
}
@@ -82,6 +98,16 @@
return &options, nil
}
+type timeoutClientStream struct {
+ grpc.ClientStream
+ timeoutFunc context.CancelFunc
+}
+
+func (t *timeoutClientStream) RecvMsg(m interface{}) error {
+ defer t.timeoutFunc()
+ return t.ClientStream.RecvMsg(m)
+}
+
// configTLS loads and parse the TLS configs.
func (c *Client) configTLS() (tc *tls.Config, tlsErr error) {
if err := checkTLSFile(c.CaPemPath); err != nil {
diff --git a/plugins/client/grpc/client_sniffer.go b/plugins/client/grpc/client_sniffer.go
index 2865c7c..e909410 100644
--- a/plugins/client/grpc/client_sniffer.go
+++ b/plugins/client/grpc/client_sniffer.go
@@ -25,6 +25,7 @@
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/status"
+ "github.com/apache/skywalking-satellite/internal/pkg/log"
"github.com/apache/skywalking-satellite/plugins/client/api"
)
@@ -37,6 +38,7 @@
select {
case <-timeTicker.C:
state := c.client.GetState()
+ log.Logger.Debugf("current grpc client state: %s", state)
if state == connectivity.Shutdown || state == connectivity.TransientFailure {
c.updateStatus(api.Disconnect)
} else if state == connectivity.Ready || state == connectivity.Idle {
diff --git a/plugins/queue/partition/partitioned_queue.go b/plugins/queue/partition/partitioned_queue.go
index b8b01c0..550899c 100644
--- a/plugins/queue/partition/partitioned_queue.go
+++ b/plugins/queue/partition/partitioned_queue.go
@@ -149,7 +149,7 @@
return checkPartition, nil
}
}
- return 0, fmt.Errorf("the queue is full")
+ return 0, api.ErrFull
}
func (p *PartitionedQueue) registerQueueTelemetry(pipeline string) {