blob: 2800b92c26f845b8876a2b0ddbbc096a2ed11ac7 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package internal
import (
"bytes"
"encoding/binary"
"fmt"
"io"
"github.com/apache/pulsar-client-go/pkg/pb"
"github.com/golang/protobuf/proto"
log "github.com/sirupsen/logrus"
)
const (
// MaxFrameSize limit the maximum size that pulsar allows for messages to be sent.
MaxFrameSize = 5 * 1024 * 1024
magicCrc32c uint16 = 0x0e01
)
func baseCommand(cmdType pb.BaseCommand_Type, msg proto.Message) *pb.BaseCommand {
cmd := &pb.BaseCommand{
Type: &cmdType,
}
switch cmdType {
case pb.BaseCommand_CONNECT:
cmd.Connect = msg.(*pb.CommandConnect)
case pb.BaseCommand_LOOKUP:
cmd.LookupTopic = msg.(*pb.CommandLookupTopic)
case pb.BaseCommand_PARTITIONED_METADATA:
cmd.PartitionMetadata = msg.(*pb.CommandPartitionedTopicMetadata)
case pb.BaseCommand_PRODUCER:
cmd.Producer = msg.(*pb.CommandProducer)
case pb.BaseCommand_SUBSCRIBE:
cmd.Subscribe = msg.(*pb.CommandSubscribe)
case pb.BaseCommand_FLOW:
cmd.Flow = msg.(*pb.CommandFlow)
case pb.BaseCommand_PING:
cmd.Ping = msg.(*pb.CommandPing)
case pb.BaseCommand_PONG:
cmd.Pong = msg.(*pb.CommandPong)
case pb.BaseCommand_SEND:
cmd.Send = msg.(*pb.CommandSend)
case pb.BaseCommand_CLOSE_PRODUCER:
cmd.CloseProducer = msg.(*pb.CommandCloseProducer)
case pb.BaseCommand_CLOSE_CONSUMER:
cmd.CloseConsumer = msg.(*pb.CommandCloseConsumer)
case pb.BaseCommand_ACK:
cmd.Ack = msg.(*pb.CommandAck)
case pb.BaseCommand_SEEK:
cmd.Seek = msg.(*pb.CommandSeek)
case pb.BaseCommand_UNSUBSCRIBE:
cmd.Unsubscribe = msg.(*pb.CommandUnsubscribe)
case pb.BaseCommand_REDELIVER_UNACKNOWLEDGED_MESSAGES:
cmd.RedeliverUnacknowledgedMessages = msg.(*pb.CommandRedeliverUnacknowledgedMessages)
default:
log.Panic("Missing command type: ", cmdType)
}
return cmd
}
func addSingleMessageToBatch(wb Buffer, smm *pb.SingleMessageMetadata, payload []byte) {
serialized, err := proto.Marshal(smm)
if err != nil {
log.WithError(err).Fatal("Protobuf serialization error")
}
wb.WriteUint32(uint32(len(serialized)))
wb.Write(serialized)
wb.Write(payload)
}
func ParseMessage(headersAndPayload []byte) (msgMeta *pb.MessageMetadata, payloadList [][]byte, err error) {
// reusable buffer for 4-byte uint32s
buf32 := make([]byte, 4)
r := bytes.NewReader(headersAndPayload)
// Wrap our reader so that we can only read
// bytes from our frame
lr := &io.LimitedReader{
N: int64(len(headersAndPayload)),
R: r,
}
// There are 3 possibilities for the following fields:
// - EOF: If so, this is a "simple" command. No more parsing required.
// - 2-byte magic number: Indicates the following 4 bytes are a checksum
// - 4-byte metadata size
// The message may optionally stop here. If so,
// this is a "simple" command.
if lr.N <= 0 {
return nil, nil, nil
}
// Optionally, the next 2 bytes may be the magicNumber. If
// so, it indicates that the following 4 bytes are a checksum.
// If not, the following 2 bytes (plus the 2 bytes already read),
// are the metadataSize, which is why a 4 byte buffer is used.
if _, err = io.ReadFull(lr, buf32); err != nil {
return nil, nil, err
}
// Check for magicNumber which indicates a checksum
var chksum CheckSum
var expectedChksum []byte
magicNumber := make([]byte, 2)
binary.BigEndian.PutUint16(magicNumber, magicCrc32c)
if magicNumber[0] == buf32[0] && magicNumber[1] == buf32[1] {
expectedChksum = make([]byte, 4)
// We already read the 2-byte magicNumber and the
// initial 2 bytes of the checksum
expectedChksum[0] = buf32[2]
expectedChksum[1] = buf32[3]
// Read the remaining 2 bytes of the checksum
if _, err = io.ReadFull(lr, expectedChksum[2:]); err != nil {
return nil, nil, err
}
// Use a tee reader to compute the checksum
// of everything consumed after this point
lr.R = io.TeeReader(lr.R, &chksum)
// Fill buffer with metadata size, which is what it
// would already contain if there were no magic number / checksum
if _, err = io.ReadFull(lr, buf32); err != nil {
return nil, nil, err
}
}
// Read metadataSize
metadataSize := binary.BigEndian.Uint32(buf32)
// guard against allocating large buffer
if metadataSize > MaxFrameSize {
return nil, nil, fmt.Errorf("frame metadata size (%d) "+
"cannot b greater than max frame size (%d)", metadataSize, MaxFrameSize)
}
// Read protobuf encoded metadata
metaBuf := make([]byte, metadataSize)
if _, err = io.ReadFull(lr, metaBuf); err != nil {
return nil, nil, err
}
msgMeta = new(pb.MessageMetadata)
if err = proto.Unmarshal(metaBuf, msgMeta); err != nil {
return nil, nil, err
}
numMsg := msgMeta.GetNumMessagesInBatch()
if numMsg > 0 && msgMeta.NumMessagesInBatch != nil {
payloads := make([]byte, lr.N)
if _, err = io.ReadFull(lr, payloads); err != nil {
return nil, nil, err
}
singleMessages, e := decodeBatchPayload(payloads, numMsg)
if e != nil {
return nil, nil, e
}
payloadList = make([][]byte, 0, numMsg)
for _, singleMsg := range singleMessages {
msgMeta.PartitionKey = singleMsg.SingleMeta.PartitionKey
msgMeta.Properties = singleMsg.SingleMeta.Properties
msgMeta.EventTime = singleMsg.SingleMeta.EventTime
payloadList = append(payloadList, singleMsg.SinglePayload)
}
if err = computeChecksum(chksum, expectedChksum); err != nil {
return nil, nil, err
}
return msgMeta, payloadList, nil
}
// Anything left in the frame is considered
// the payload and can be any sequence of bytes.
payloadList = make([][]byte, 0, 10)
if lr.N > 0 {
// guard against allocating large buffer
if lr.N > MaxFrameSize {
return nil, nil, fmt.Errorf("frame payload size (%d) cannot be greater than max frame size (%d)", lr.N, MaxFrameSize)
}
payload := make([]byte, lr.N)
if _, err = io.ReadFull(lr, payload); err != nil {
return nil, nil, err
}
payloadList = append(payloadList, payload)
}
if err = computeChecksum(chksum, expectedChksum); err != nil {
return nil, nil, err
}
return msgMeta, payloadList, nil
}
func computeChecksum(chksum CheckSum, expectedChksum []byte) error {
computed := chksum.compute()
if !bytes.Equal(computed, expectedChksum) {
return fmt.Errorf("checksum mismatch: computed (0x%X) does "+
"not match given checksum (0x%X)", computed, expectedChksum)
}
return nil
}
func serializeBatch(wb Buffer, cmdSend *pb.BaseCommand, msgMetadata *pb.MessageMetadata, payload []byte) {
// Wire format
// [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD]
cmdSize := proto.Size(cmdSend)
msgMetadataSize := proto.Size(msgMetadata)
payloadSize := len(payload)
magicAndChecksumLength := 2 + 4 /* magic + checksumLength */
headerContentSize := 4 + cmdSize + magicAndChecksumLength + 4 + msgMetadataSize
// cmdLength + cmdSize + magicLength + checksumSize + msgMetadataLength + msgMetadataSize
totalSize := headerContentSize + payloadSize
wb.WriteUint32(uint32(totalSize)) // External frame
// Write cmd
wb.WriteUint32(uint32(cmdSize))
serialized, err := proto.Marshal(cmdSend)
if err != nil {
log.WithError(err).Fatal("Protobuf error when serializing cmdSend")
}
wb.Write(serialized)
// Create checksum placeholder
wb.WriteUint16(magicCrc32c)
checksumIdx := wb.WriterIndex()
wb.WriteUint32(0) // skip 4 bytes of checksum
// Write metadata
metadataStartIdx := wb.WriterIndex()
wb.WriteUint32(uint32(msgMetadataSize))
serialized, err = proto.Marshal(msgMetadata)
if err != nil {
log.WithError(err).Fatal("Protobuf error when serializing msgMetadata")
}
wb.Write(serialized)
wb.Write(payload)
// Write checksum at created checksum-placeholder
endIdx := wb.WriterIndex()
checksum := Crc32cCheckSum(wb.Get(metadataStartIdx, endIdx-metadataStartIdx))
// set computed checksum
wb.PutUint32(checksum, checksumIdx)
}
// singleMessage represents one of the elements of the batch type payload
type singleMessage struct {
SingleMetaSize uint32
SingleMeta *pb.SingleMessageMetadata
SinglePayload []byte
}
// decodeBatchPayload parses the payload of the batch type
// If the producer uses the batch function, msg.Payload will be a singleMessage array structure.
func decodeBatchPayload(bp []byte, batchNum int32) ([]*singleMessage, error) {
buf32 := make([]byte, 4)
rdBuf := bytes.NewReader(bp)
singleMsgList := make([]*singleMessage, 0, batchNum)
for i := int32(0); i < batchNum; i++ {
// singleMetaSize
if _, err := io.ReadFull(rdBuf, buf32); err != nil {
return nil, err
}
singleMetaSize := binary.BigEndian.Uint32(buf32)
// singleMeta
singleMetaBuf := make([]byte, singleMetaSize)
if _, err := io.ReadFull(rdBuf, singleMetaBuf); err != nil {
return nil, err
}
singleMeta := new(pb.SingleMessageMetadata)
if err := proto.Unmarshal(singleMetaBuf, singleMeta); err != nil {
return nil, err
}
// payload
singlePayload := make([]byte, singleMeta.GetPayloadSize())
if _, err := io.ReadFull(rdBuf, singlePayload); err != nil {
return nil, err
}
singleMsg := &singleMessage{
SingleMetaSize: singleMetaSize,
SingleMeta: singleMeta,
SinglePayload: singlePayload,
}
singleMsgList = append(singleMsgList, singleMsg)
}
return singleMsgList, nil
}
// ConvertFromStringMap convert a string map to a KeyValue []byte
func ConvertFromStringMap(m map[string]string) []*pb.KeyValue {
list := make([]*pb.KeyValue, len(m))
i := 0
for k, v := range m {
list[i] = &pb.KeyValue{
Key: proto.String(k),
Value: proto.String(v),
}
i++
}
return list
}
// ConvertToStringMap convert a KeyValue []byte to string map
func ConvertToStringMap(pbb []*pb.KeyValue) map[string]string {
m := make(map[string]string)
for _, kv := range pbb {
m[*kv.Key] = *kv.Value
}
return m
}