Improve error log for frame size too big and maxMessageSize (#459)
### Motivation
This PR is to improve error log for `frame size too big` on the consumer. We have seen a number of frame size too big error and I would like to trace what the exact maxMessageSize is set to that is not default. Debug log was not able to be enabled on the production system.
### Modifications
Log improvement mostly
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 6619fe6..433c8a8 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -302,7 +302,7 @@
cmd.Error.GetMessage())
return false
}
- if cmd.Connected.MaxMessageSize != nil {
+ if cmd.Connected.MaxMessageSize != nil && *cmd.Connected.MaxMessageSize > 0 {
c.log.Debug("Got MaxMessageSize from handshake response:", *cmd.Connected.MaxMessageSize)
c.maxMessageSize = *cmd.Connected.MaxMessageSize
} else {
diff --git a/pulsar/internal/connection_reader.go b/pulsar/internal/connection_reader.go
index 8035324..62a6526 100644
--- a/pulsar/internal/connection_reader.go
+++ b/pulsar/internal/connection_reader.go
@@ -19,6 +19,7 @@
import (
"bufio"
+ "fmt"
"io"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
@@ -54,7 +55,7 @@
if headersAndPayload != nil {
payloadLen = headersAndPayload.ReadableBytes()
}
- r.cnx.log.Debug("Got command! ", cmd, " with payload size: ", payloadLen)
+ r.cnx.log.Debug("Got command! ", cmd, " with payload size: ", payloadLen, " maxMsgSize: ", r.cnx.maxMessageSize)
r.cnx.receivedCommand(cmd, headersAndPayload)
}
}
@@ -73,10 +74,12 @@
// We have enough to read frame size
frameSize := r.buffer.ReadUint32()
- if r.cnx.maxMessageSize != 0 && int32(frameSize) > (r.cnx.maxMessageSize+MessageFramePadding) {
- r.cnx.log.Warnf("Received too big frame size. size=%d", frameSize)
+ maxFrameSize := r.cnx.maxMessageSize + MessageFramePadding
+ if r.cnx.maxMessageSize != 0 && int32(frameSize) > maxFrameSize {
+ frameSizeError := fmt.Errorf("received too big frame size=%d maxFrameSize=%d", frameSize, maxFrameSize)
+ r.cnx.log.Error(frameSizeError)
r.cnx.TriggerClose()
- return nil, nil, errors.New("Frame size too big")
+ return nil, nil, frameSizeError
}
// Next, we read the rest of the frame
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 02ee7cf..34def20 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -351,7 +351,7 @@
p.log.WithError(errMessageTooLarge).
WithField("size", len(payload)).
WithField("properties", msg.Properties).
- Error()
+ Errorf("MaxMessageSize %d", int(p.cnx.GetMaxMessageSize()))
p.metrics.PublishErrorsMsgTooLarge.Inc()
return
}