blob: a4d5e5f77ebd4c666869e865ed4bcfa894fc1935 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package internal
import (
"errors"
"fmt"
"github.com/gogo/protobuf/proto"
"github.com/apache/pulsar-client-go/pulsar/internal/compression"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
)
const (
// MaxFrameSize limit the maximum size that pulsar allows for messages to be sent.
MaxFrameSize = 5 * 1024 * 1024
// MessageFramePadding is for metadata and other frame headers
MessageFramePadding = 10 * 1024
// MaxMessageSize limit message size for transfer
MaxMessageSize = MaxFrameSize - MessageFramePadding
magicCrc32c uint16 = 0x0e01
)
// ErrCorruptedMessage is the error returned by ReadMessageData when it has detected corrupted data.
// The data is considered corrupted if it's missing a header, a checksum mismatch or there
// was an error when unmarshalling the message metadata.
var ErrCorruptedMessage = errors.New("corrupted message")
// ErrEOM is the error returned by ReadMessage when no more input is available.
var ErrEOM = errors.New("EOF")
var ErrConnectionClosed = errors.New("connection closed")
func NewMessageReader(headersAndPayload Buffer) *MessageReader {
return &MessageReader{
buffer: headersAndPayload,
}
}
func NewMessageReaderFromArray(headersAndPayload []byte) *MessageReader {
return NewMessageReader(NewBufferWrapper(headersAndPayload))
}
// MessageReader provides helper methods to parse
// the metadata and messages from the binary format
// Wire format for a messages
//
// Old format (single message)
// [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD]
//
// Batch format
// [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [METADATA_SIZE][METADATA][PAYLOAD]
// [METADATA_SIZE][METADATA][PAYLOAD]
//
type MessageReader struct {
buffer Buffer
// true if we are parsing a batched message - set after parsing the message metadata
batched bool
}
// ReadChecksum
func (r *MessageReader) readChecksum() (uint32, error) {
if r.buffer.ReadableBytes() < 6 {
return 0, errors.New("missing message header")
}
// reader magic number
magicNumber := r.buffer.ReadUint16()
if magicNumber != magicCrc32c {
return 0, ErrCorruptedMessage
}
checksum := r.buffer.ReadUint32()
return checksum, nil
}
func (r *MessageReader) ReadMessageMetadata() (*pb.MessageMetadata, error) {
// Wire format
// [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA]
// read checksum
checksum, err := r.readChecksum()
if err != nil {
return nil, err
}
// validate checksum
computedChecksum := Crc32cCheckSum(r.buffer.ReadableSlice())
if checksum != computedChecksum {
return nil, fmt.Errorf("checksum mismatch received: 0x%x computed: 0x%x", checksum, computedChecksum)
}
size := r.buffer.ReadUint32()
data := r.buffer.Read(size)
var meta pb.MessageMetadata
if err := proto.Unmarshal(data, &meta); err != nil {
return nil, ErrCorruptedMessage
}
if meta.NumMessagesInBatch != nil {
r.batched = true
}
return &meta, nil
}
func (r *MessageReader) ReadMessage() (*pb.SingleMessageMetadata, []byte, error) {
if r.buffer.ReadableBytes() == 0 && r.buffer.Capacity() > 0 {
return nil, nil, ErrEOM
}
if !r.batched {
return r.readMessage()
}
return r.readSingleMessage()
}
func (r *MessageReader) readMessage() (*pb.SingleMessageMetadata, []byte, error) {
// Wire format
// [PAYLOAD]
return nil, r.buffer.Read(r.buffer.ReadableBytes()), nil
}
func (r *MessageReader) readSingleMessage() (*pb.SingleMessageMetadata, []byte, error) {
// Wire format
// [METADATA_SIZE][METADATA][PAYLOAD]
size := r.buffer.ReadUint32()
var meta pb.SingleMessageMetadata
if err := proto.Unmarshal(r.buffer.Read(size), &meta); err != nil {
return nil, nil, err
}
return &meta, r.buffer.Read(uint32(meta.GetPayloadSize())), nil
}
func (r *MessageReader) ResetBuffer(buffer Buffer) {
r.buffer = buffer
}
func baseCommand(cmdType pb.BaseCommand_Type, msg proto.Message) *pb.BaseCommand {
cmd := &pb.BaseCommand{
Type: &cmdType,
}
switch cmdType {
case pb.BaseCommand_CONNECT:
cmd.Connect = msg.(*pb.CommandConnect)
case pb.BaseCommand_LOOKUP:
cmd.LookupTopic = msg.(*pb.CommandLookupTopic)
case pb.BaseCommand_PARTITIONED_METADATA:
cmd.PartitionMetadata = msg.(*pb.CommandPartitionedTopicMetadata)
case pb.BaseCommand_PRODUCER:
cmd.Producer = msg.(*pb.CommandProducer)
case pb.BaseCommand_SUBSCRIBE:
cmd.Subscribe = msg.(*pb.CommandSubscribe)
case pb.BaseCommand_FLOW:
cmd.Flow = msg.(*pb.CommandFlow)
case pb.BaseCommand_PING:
cmd.Ping = msg.(*pb.CommandPing)
case pb.BaseCommand_PONG:
cmd.Pong = msg.(*pb.CommandPong)
case pb.BaseCommand_SEND:
cmd.Send = msg.(*pb.CommandSend)
case pb.BaseCommand_CLOSE_PRODUCER:
cmd.CloseProducer = msg.(*pb.CommandCloseProducer)
case pb.BaseCommand_CLOSE_CONSUMER:
cmd.CloseConsumer = msg.(*pb.CommandCloseConsumer)
case pb.BaseCommand_ACK:
cmd.Ack = msg.(*pb.CommandAck)
case pb.BaseCommand_SEEK:
cmd.Seek = msg.(*pb.CommandSeek)
case pb.BaseCommand_UNSUBSCRIBE:
cmd.Unsubscribe = msg.(*pb.CommandUnsubscribe)
case pb.BaseCommand_REDELIVER_UNACKNOWLEDGED_MESSAGES:
cmd.RedeliverUnacknowledgedMessages = msg.(*pb.CommandRedeliverUnacknowledgedMessages)
case pb.BaseCommand_GET_TOPICS_OF_NAMESPACE:
cmd.GetTopicsOfNamespace = msg.(*pb.CommandGetTopicsOfNamespace)
case pb.BaseCommand_GET_LAST_MESSAGE_ID:
cmd.GetLastMessageId = msg.(*pb.CommandGetLastMessageId)
case pb.BaseCommand_AUTH_RESPONSE:
cmd.AuthResponse = msg.(*pb.CommandAuthResponse)
default:
panic(fmt.Sprintf("Missing command type: %v", cmdType))
}
return cmd
}
func addSingleMessageToBatch(wb Buffer, smm *pb.SingleMessageMetadata, payload []byte) {
metadataSize := uint32(smm.Size())
wb.WriteUint32(metadataSize)
wb.ResizeIfNeeded(metadataSize)
_, err := smm.MarshalToSizedBuffer(wb.WritableSlice()[:metadataSize])
if err != nil {
panic(fmt.Sprintf("Protobuf serialization error: %v", err))
}
wb.WrittenBytes(metadataSize)
wb.Write(payload)
}
func serializeBatch(wb Buffer,
cmdSend *pb.BaseCommand,
msgMetadata *pb.MessageMetadata,
uncompressedPayload Buffer,
compressionProvider compression.Provider) {
// Wire format
// [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD]
cmdSize := uint32(proto.Size(cmdSend))
msgMetadataSize := uint32(proto.Size(msgMetadata))
frameSizeIdx := wb.WriterIndex()
wb.WriteUint32(0) // Skip frame size until we now the size
frameStartIdx := wb.WriterIndex()
// Write cmd
wb.WriteUint32(cmdSize)
wb.ResizeIfNeeded(cmdSize)
_, err := cmdSend.MarshalToSizedBuffer(wb.WritableSlice()[:cmdSize])
if err != nil {
panic(fmt.Sprintf("Protobuf error when serializing cmdSend: %v", err))
}
wb.WrittenBytes(cmdSize)
// Create checksum placeholder
wb.WriteUint16(magicCrc32c)
checksumIdx := wb.WriterIndex()
wb.WriteUint32(0) // skip 4 bytes of checksum
// Write metadata
metadataStartIdx := wb.WriterIndex()
wb.WriteUint32(msgMetadataSize)
wb.ResizeIfNeeded(msgMetadataSize)
_, err = msgMetadata.MarshalToSizedBuffer(wb.WritableSlice()[:msgMetadataSize])
if err != nil {
panic(fmt.Sprintf("Protobuf error when serializing msgMetadata: %v", err))
}
wb.WrittenBytes(msgMetadataSize)
// Make sure the buffer has enough space to hold the compressed data
// and perform the compression in-place
maxSize := uint32(compressionProvider.CompressMaxSize(int(uncompressedPayload.ReadableBytes())))
wb.ResizeIfNeeded(maxSize)
b := compressionProvider.Compress(wb.WritableSlice()[:0], uncompressedPayload.ReadableSlice())
wb.WrittenBytes(uint32(len(b)))
// Write checksum at created checksum-placeholder
frameEndIdx := wb.WriterIndex()
checksum := Crc32cCheckSum(wb.Get(metadataStartIdx, frameEndIdx-metadataStartIdx))
// Set Sizes and checksum in the fixed-size header
wb.PutUint32(frameEndIdx-frameStartIdx, frameSizeIdx) // External frame
wb.PutUint32(checksum, checksumIdx)
}
// ConvertFromStringMap convert a string map to a KeyValue []byte
func ConvertFromStringMap(m map[string]string) []*pb.KeyValue {
list := make([]*pb.KeyValue, len(m))
i := 0
for k, v := range m {
list[i] = &pb.KeyValue{
Key: proto.String(k),
Value: proto.String(v),
}
i++
}
return list
}
// ConvertToStringMap convert a KeyValue []byte to string map
func ConvertToStringMap(pbb []*pb.KeyValue) map[string]string {
m := make(map[string]string)
for _, kv := range pbb {
m[*kv.Key] = *kv.Value
}
return m
}