Add timeout config for the gRPC client (#132)

diff --git a/CHANGES.md b/CHANGES.md
index b622d42..7d0cf87 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -8,6 +8,8 @@
 * Introduce `pprof` module.
 * Support export multiple `telemetry` service.
 * Update the base docker image.
+* Add timeout configuration for gRPC client.
+* Reduce log print when the enqueue data to the pipeline error.
 
 #### Bug Fixes
 * Fix [CVE-2022-41721](https://avd.aquasec.com/nvd/cve-2022-41721).
diff --git a/configs/satellite_config.yaml b/configs/satellite_config.yaml
index 938dcf8..fa9e559 100644
--- a/configs/satellite_config.yaml
+++ b/configs/satellite_config.yaml
@@ -120,6 +120,12 @@
       check_period: ${SATELLITE_GRPC_CHECK_PERIOD:5}
       # The auth value when send request
       authentication: ${SATELLITE_GRPC_AUTHENTICATION:""}
+      # The gRPC send request timeout
+      timeout:
+        # The timeout for unary single request
+        unary: ${SATELLITE_GRPC_TIMEOUT_UNARY:5s}
+        # The timeout for unary stream request
+        stream: ${SATELLITE_GRPC_TIMEOUT_STREAM:20s}
   servers:
     - plugin_name: "grpc-server"
       # The address of grpc server.
diff --git a/docs/en/setup/plugins/client_grpc-client.md b/docs/en/setup/plugins/client_grpc-client.md
index e56fed9..5ece8d9 100755
--- a/docs/en/setup/plugins/client_grpc-client.md
+++ b/docs/en/setup/plugins/client_grpc-client.md
@@ -41,6 +41,13 @@
 
 # How frequently to check the connection(second)
 check_period: 5
+
+# The gRPC send request timeout
+timeout:
+  # The timeout for unary single request
+  unary: 5s
+  # The timeout for unary stream request
+  stream: 20s
 ```
 ## Configuration
 |Name|Type|Description|
@@ -68,4 +75,5 @@
 | insecure_skip_verify | bool | Controls whether a client verifies the server's certificate chain and host name. |
 | authentication | string | The auth value when send request |
 | check_period | int | How frequently to check the connection(second) |
+| timeout | grpc.TimeoutConfig | The gRPC send request timeout |
 
diff --git a/internal/satellite/config/loader_test.go b/internal/satellite/config/loader_test.go
index 1dcddcc..42669bf 100644
--- a/internal/satellite/config/loader_test.go
+++ b/internal/satellite/config/loader_test.go
@@ -98,6 +98,10 @@
 				"check_period":           5,
 				"authentication":         "",
 				"finder_type":            "static",
+				"timeout": map[string]interface{}{
+					"unary":  "5s",
+					"stream": "20s",
+				},
 				"kubernetes_config": map[string]interface{}{
 					"api_server": "",
 					"basic_auth": map[string]interface{}{
diff --git a/internal/satellite/module/gatherer/receiver_gatherer.go b/internal/satellite/module/gatherer/receiver_gatherer.go
index 9185e97..caf4657 100644
--- a/internal/satellite/module/gatherer/receiver_gatherer.go
+++ b/internal/satellite/module/gatherer/receiver_gatherer.go
@@ -39,6 +39,8 @@
 	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
 )
 
+var enqueueErrorPrintInterval = 20 * time.Second
+
 type ReceiverGatherer struct {
 	// config
 	config *api.GathererConfig
@@ -56,6 +58,9 @@
 
 	// sync invoker
 	processor processor.Processor
+
+	lastEnqueueErrorPrintTime time.Time
+	enqueueErrorCounter       map[string]int
 }
 
 func (r *ReceiverGatherer) Prepare() error {
@@ -71,6 +76,8 @@
 	}
 	r.receiveCounter = telemetry.NewCounter("gatherer_receive_count", "Total number of the receiving count in the Gatherer.", "pipe", "status")
 	r.queueOutputCounter = telemetry.NewCounter("queue_output_count", "Total number of the output count in the Queue of Gatherer.", "pipe", "status")
+	r.lastEnqueueErrorPrintTime = time.Now()
+	r.enqueueErrorCounter = make(map[string]int)
 	return nil
 }
 
@@ -88,11 +95,7 @@
 				r.receiveCounter.Inc(r.config.PipeName, "all")
 				err := r.runningQueue.Enqueue(e)
 				if err != nil {
-					r.receiveCounter.Inc(r.config.PipeName, "abandoned")
-					log.Logger.WithFields(logrus.Fields{
-						"pipe":  r.config.PipeName,
-						"queue": r.runningQueue.Name(),
-					}).Errorf("error in enqueue: %v", err)
+					r.recordEnqueueError(err)
 				}
 			case <-childCtx.Done():
 				cancel()
@@ -107,6 +110,23 @@
 	wg.Wait()
 }
 
+func (r *ReceiverGatherer) recordEnqueueError(err error) {
+	r.receiveCounter.Inc(r.config.PipeName, "abandoned")
+	r.enqueueErrorCounter[err.Error()]++
+	if time.Now().After(r.lastEnqueueErrorPrintTime.Add(enqueueErrorPrintInterval)) {
+		for e, count := range r.enqueueErrorCounter {
+			log.Logger.WithFields(logrus.Fields{
+				"pipe":  r.config.PipeName,
+				"queue": r.runningQueue.Name(),
+				"count": count,
+			}).Errorf("error in enqueue: %v", e)
+		}
+
+		r.lastEnqueueErrorPrintTime = time.Now()
+		r.enqueueErrorCounter = make(map[string]int)
+	}
+}
+
 func (r *ReceiverGatherer) consumeQueue(ctx context.Context, p int, wg *sync.WaitGroup) {
 	go func() {
 		childCtx, cancel := context.WithCancel(ctx)
diff --git a/plugins/client/grpc/client.go b/plugins/client/grpc/client.go
index d5c9ea8..ad326f9 100644
--- a/plugins/client/grpc/client.go
+++ b/plugins/client/grpc/client.go
@@ -49,6 +49,8 @@
 	Authentication     string `mapstructure:"authentication"`       // The auth value when send request
 	CheckPeriod        int    `mapstructure:"check_period"`         // How frequently to check the connection(second)
 
+	Timeout TimeoutConfig `mapstructure:"timeout"` // The gRPC send request timeout
+
 	// components
 	status    api.ClientStatus
 	client    *grpc.ClientConn
@@ -57,6 +59,11 @@
 	cancel    context.CancelFunc // Parent ctx cancel function
 }
 
+type TimeoutConfig struct {
+	Unary  string `mapstructure:"unary"`  // The timeout for unary single request
+	Stream string `mapstructure:"stream"` // The timeout for unary stream request
+}
+
 func (c *Client) Name() string {
 	return Name
 }
@@ -109,6 +116,13 @@
 
 # How frequently to check the connection(second)
 check_period: 5
+
+# The gRPC send request timeout
+timeout:
+  # The timeout for unary single request
+  unary: 5s
+  # The timeout for unary stream request
+  stream: 20s
 `
 }
 
diff --git a/plugins/client/grpc/client_config.go b/plugins/client/grpc/client_config.go
index b3f8e20..67641db 100644
--- a/plugins/client/grpc/client_config.go
+++ b/plugins/client/grpc/client_config.go
@@ -23,6 +23,7 @@
 	"crypto/x509"
 	"fmt"
 	"os"
+	"time"
 
 	"github.com/apache/skywalking-satellite/plugins/client/grpc/lb"
 
@@ -52,24 +53,39 @@
 		authHeader = metadata.New(map[string]string{"Authentication": c.Authentication})
 	}
 
+	unaryRequestTimeout, err := time.ParseDuration(c.Timeout.Unary)
+	if err != nil {
+		return nil, fmt.Errorf("cannot parse the unary request timeout: %v", err)
+	}
+	streamRequestTimeout, err := time.ParseDuration(c.Timeout.Stream)
+	if err != nil {
+		return nil, fmt.Errorf("cannot parse the stream request timeout: %v", err)
+	}
+
 	// append auth or report error
 	options = append(options, grpc.WithStreamInterceptor(func(ctx context.Context, desc *grpc.StreamDesc,
 		cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
 		if authHeader != nil {
 			ctx = metadata.NewOutgoingContext(ctx, authHeader)
 		}
-		clientStream, err := streamer(ctx, desc, cc, method, opts...)
+		timeout, timeoutFunc := context.WithTimeout(ctx, streamRequestTimeout)
+		clientStream, err := streamer(timeout, desc, cc, method, opts...)
 		if err != nil {
+			timeoutFunc()
 			c.reportError(err)
+			return nil, err
 		}
-		return clientStream, err
+		streamWrapper := &timeoutClientStream{clientStream, timeoutFunc}
+		return streamWrapper, err
 	}))
 	grpc.WithUnaryInterceptor(func(ctx context.Context, method string, req, reply interface{},
 		cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
 		if authHeader != nil {
 			ctx = metadata.NewOutgoingContext(ctx, authHeader)
 		}
-		err := invoker(ctx, method, req, reply, cc, opts...)
+		timeout, timeoutFunc := context.WithTimeout(ctx, unaryRequestTimeout)
+		defer timeoutFunc()
+		err := invoker(timeout, method, req, reply, cc, opts...)
 		if err != nil {
 			c.reportError(err)
 		}
@@ -82,6 +98,16 @@
 	return &options, nil
 }
 
+type timeoutClientStream struct {
+	grpc.ClientStream
+	timeoutFunc context.CancelFunc
+}
+
+func (t *timeoutClientStream) RecvMsg(m interface{}) error {
+	defer t.timeoutFunc()
+	return t.ClientStream.RecvMsg(m)
+}
+
 // configTLS loads and parse the TLS configs.
 func (c *Client) configTLS() (tc *tls.Config, tlsErr error) {
 	if err := checkTLSFile(c.CaPemPath); err != nil {
diff --git a/plugins/client/grpc/client_sniffer.go b/plugins/client/grpc/client_sniffer.go
index 2865c7c..e909410 100644
--- a/plugins/client/grpc/client_sniffer.go
+++ b/plugins/client/grpc/client_sniffer.go
@@ -25,6 +25,7 @@
 	"google.golang.org/grpc/connectivity"
 	"google.golang.org/grpc/status"
 
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
 	"github.com/apache/skywalking-satellite/plugins/client/api"
 )
 
@@ -37,6 +38,7 @@
 		select {
 		case <-timeTicker.C:
 			state := c.client.GetState()
+			log.Logger.Debugf("current grpc client state: %s", state)
 			if state == connectivity.Shutdown || state == connectivity.TransientFailure {
 				c.updateStatus(api.Disconnect)
 			} else if state == connectivity.Ready || state == connectivity.Idle {
diff --git a/plugins/queue/partition/partitioned_queue.go b/plugins/queue/partition/partitioned_queue.go
index b8b01c0..550899c 100644
--- a/plugins/queue/partition/partitioned_queue.go
+++ b/plugins/queue/partition/partitioned_queue.go
@@ -149,7 +149,7 @@
 			return checkPartition, nil
 		}
 	}
-	return 0, fmt.Errorf("the queue is full")
+	return 0, api.ErrFull
 }
 
 func (p *PartitionedQueue) registerQueueTelemetry(pipeline string) {