| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| package internal |
| |
| import ( |
| "errors" |
| "fmt" |
| |
| "github.com/gogo/protobuf/proto" |
| |
| "github.com/apache/pulsar-client-go/pulsar/internal/compression" |
| pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" |
| ) |
| |
| const ( |
| // MaxMessageSize limit message size for transfer |
| MaxMessageSize = 5 * 1024 * 1024 |
| // MessageFramePadding is for metadata and other frame headers |
| MessageFramePadding = 10 * 1024 |
| // MaxFrameSize limit the maximum size that pulsar allows for messages to be sent. |
| MaxFrameSize = MaxMessageSize + MessageFramePadding |
| magicCrc32c uint16 = 0x0e01 |
| ) |
| |
| // ErrCorruptedMessage is the error returned by ReadMessageData when it has detected corrupted data. |
| // The data is considered corrupted if it's missing a header, a checksum mismatch or there |
| // was an error when unmarshalling the message metadata. |
| var ErrCorruptedMessage = errors.New("corrupted message") |
| |
| // ErrEOM is the error returned by ReadMessage when no more input is available. |
| var ErrEOM = errors.New("EOF") |
| |
| var ErrConnectionClosed = errors.New("connection closed") |
| |
| func NewMessageReader(headersAndPayload Buffer) *MessageReader { |
| return &MessageReader{ |
| buffer: headersAndPayload, |
| } |
| } |
| |
| func NewMessageReaderFromArray(headersAndPayload []byte) *MessageReader { |
| return NewMessageReader(NewBufferWrapper(headersAndPayload)) |
| } |
| |
| // MessageReader provides helper methods to parse |
| // the metadata and messages from the binary format |
| // Wire format for a messages |
| // |
| // Old format (single message) |
| // [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD] |
| // |
| // Batch format |
| // [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [METADATA_SIZE][METADATA][PAYLOAD] |
| // [METADATA_SIZE][METADATA][PAYLOAD] |
| // |
| type MessageReader struct { |
| buffer Buffer |
| // true if we are parsing a batched message - set after parsing the message metadata |
| batched bool |
| } |
| |
| // ReadChecksum |
| func (r *MessageReader) readChecksum() (uint32, error) { |
| if r.buffer.ReadableBytes() < 6 { |
| return 0, errors.New("missing message header") |
| } |
| // reader magic number |
| magicNumber := r.buffer.ReadUint16() |
| if magicNumber != magicCrc32c { |
| return 0, ErrCorruptedMessage |
| } |
| checksum := r.buffer.ReadUint32() |
| return checksum, nil |
| } |
| |
| func (r *MessageReader) ReadMessageMetadata() (*pb.MessageMetadata, error) { |
| // Wire format |
| // [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] |
| |
| // read checksum |
| checksum, err := r.readChecksum() |
| if err != nil { |
| return nil, err |
| } |
| |
| // validate checksum |
| computedChecksum := Crc32cCheckSum(r.buffer.ReadableSlice()) |
| if checksum != computedChecksum { |
| return nil, fmt.Errorf("checksum mismatch received: 0x%x computed: 0x%x", checksum, computedChecksum) |
| } |
| |
| size := r.buffer.ReadUint32() |
| data := r.buffer.Read(size) |
| var meta pb.MessageMetadata |
| if err := proto.Unmarshal(data, &meta); err != nil { |
| return nil, ErrCorruptedMessage |
| } |
| |
| if meta.NumMessagesInBatch != nil { |
| r.batched = true |
| } |
| |
| return &meta, nil |
| } |
| |
| func (r *MessageReader) ReadMessage() (*pb.SingleMessageMetadata, []byte, error) { |
| if r.buffer.ReadableBytes() == 0 && r.buffer.Capacity() > 0 { |
| return nil, nil, ErrEOM |
| } |
| if !r.batched { |
| return r.readMessage() |
| } |
| |
| return r.readSingleMessage() |
| } |
| |
| func (r *MessageReader) readMessage() (*pb.SingleMessageMetadata, []byte, error) { |
| // Wire format |
| // [PAYLOAD] |
| |
| return nil, r.buffer.Read(r.buffer.ReadableBytes()), nil |
| } |
| |
| func (r *MessageReader) readSingleMessage() (*pb.SingleMessageMetadata, []byte, error) { |
| // Wire format |
| // [METADATA_SIZE][METADATA][PAYLOAD] |
| |
| size := r.buffer.ReadUint32() |
| var meta pb.SingleMessageMetadata |
| if err := proto.Unmarshal(r.buffer.Read(size), &meta); err != nil { |
| return nil, nil, err |
| } |
| |
| return &meta, r.buffer.Read(uint32(meta.GetPayloadSize())), nil |
| } |
| |
| func (r *MessageReader) ResetBuffer(buffer Buffer) { |
| r.buffer = buffer |
| } |
| |
| func baseCommand(cmdType pb.BaseCommand_Type, msg proto.Message) *pb.BaseCommand { |
| cmd := &pb.BaseCommand{ |
| Type: &cmdType, |
| } |
| switch cmdType { |
| case pb.BaseCommand_CONNECT: |
| cmd.Connect = msg.(*pb.CommandConnect) |
| case pb.BaseCommand_LOOKUP: |
| cmd.LookupTopic = msg.(*pb.CommandLookupTopic) |
| case pb.BaseCommand_PARTITIONED_METADATA: |
| cmd.PartitionMetadata = msg.(*pb.CommandPartitionedTopicMetadata) |
| case pb.BaseCommand_PRODUCER: |
| cmd.Producer = msg.(*pb.CommandProducer) |
| case pb.BaseCommand_SUBSCRIBE: |
| cmd.Subscribe = msg.(*pb.CommandSubscribe) |
| case pb.BaseCommand_FLOW: |
| cmd.Flow = msg.(*pb.CommandFlow) |
| case pb.BaseCommand_PING: |
| cmd.Ping = msg.(*pb.CommandPing) |
| case pb.BaseCommand_PONG: |
| cmd.Pong = msg.(*pb.CommandPong) |
| case pb.BaseCommand_SEND: |
| cmd.Send = msg.(*pb.CommandSend) |
| case pb.BaseCommand_CLOSE_PRODUCER: |
| cmd.CloseProducer = msg.(*pb.CommandCloseProducer) |
| case pb.BaseCommand_CLOSE_CONSUMER: |
| cmd.CloseConsumer = msg.(*pb.CommandCloseConsumer) |
| case pb.BaseCommand_ACK: |
| cmd.Ack = msg.(*pb.CommandAck) |
| case pb.BaseCommand_SEEK: |
| cmd.Seek = msg.(*pb.CommandSeek) |
| case pb.BaseCommand_UNSUBSCRIBE: |
| cmd.Unsubscribe = msg.(*pb.CommandUnsubscribe) |
| case pb.BaseCommand_REDELIVER_UNACKNOWLEDGED_MESSAGES: |
| cmd.RedeliverUnacknowledgedMessages = msg.(*pb.CommandRedeliverUnacknowledgedMessages) |
| case pb.BaseCommand_GET_TOPICS_OF_NAMESPACE: |
| cmd.GetTopicsOfNamespace = msg.(*pb.CommandGetTopicsOfNamespace) |
| case pb.BaseCommand_GET_LAST_MESSAGE_ID: |
| cmd.GetLastMessageId = msg.(*pb.CommandGetLastMessageId) |
| case pb.BaseCommand_AUTH_RESPONSE: |
| cmd.AuthResponse = msg.(*pb.CommandAuthResponse) |
| default: |
| panic(fmt.Sprintf("Missing command type: %v", cmdType)) |
| } |
| |
| return cmd |
| } |
| |
| func addSingleMessageToBatch(wb Buffer, smm *pb.SingleMessageMetadata, payload []byte) { |
| metadataSize := uint32(smm.Size()) |
| wb.WriteUint32(metadataSize) |
| |
| wb.ResizeIfNeeded(metadataSize) |
| _, err := smm.MarshalToSizedBuffer(wb.WritableSlice()[:metadataSize]) |
| if err != nil { |
| panic(fmt.Sprintf("Protobuf serialization error: %v", err)) |
| } |
| |
| wb.WrittenBytes(metadataSize) |
| wb.Write(payload) |
| } |
| |
| func serializeBatch(wb Buffer, |
| cmdSend *pb.BaseCommand, |
| msgMetadata *pb.MessageMetadata, |
| uncompressedPayload Buffer, |
| compressionProvider compression.Provider) { |
| // Wire format |
| // [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD] |
| cmdSize := uint32(proto.Size(cmdSend)) |
| msgMetadataSize := uint32(proto.Size(msgMetadata)) |
| |
| frameSizeIdx := wb.WriterIndex() |
| wb.WriteUint32(0) // Skip frame size until we now the size |
| frameStartIdx := wb.WriterIndex() |
| |
| // Write cmd |
| wb.WriteUint32(cmdSize) |
| wb.ResizeIfNeeded(cmdSize) |
| _, err := cmdSend.MarshalToSizedBuffer(wb.WritableSlice()[:cmdSize]) |
| if err != nil { |
| panic(fmt.Sprintf("Protobuf error when serializing cmdSend: %v", err)) |
| } |
| wb.WrittenBytes(cmdSize) |
| |
| // Create checksum placeholder |
| wb.WriteUint16(magicCrc32c) |
| checksumIdx := wb.WriterIndex() |
| wb.WriteUint32(0) // skip 4 bytes of checksum |
| |
| // Write metadata |
| metadataStartIdx := wb.WriterIndex() |
| wb.WriteUint32(msgMetadataSize) |
| wb.ResizeIfNeeded(msgMetadataSize) |
| _, err = msgMetadata.MarshalToSizedBuffer(wb.WritableSlice()[:msgMetadataSize]) |
| if err != nil { |
| panic(fmt.Sprintf("Protobuf error when serializing msgMetadata: %v", err)) |
| } |
| wb.WrittenBytes(msgMetadataSize) |
| |
| // Make sure the buffer has enough space to hold the compressed data |
| // and perform the compression in-place |
| maxSize := uint32(compressionProvider.CompressMaxSize(int(uncompressedPayload.ReadableBytes()))) |
| wb.ResizeIfNeeded(maxSize) |
| b := compressionProvider.Compress(wb.WritableSlice()[:0], uncompressedPayload.ReadableSlice()) |
| wb.WrittenBytes(uint32(len(b))) |
| |
| // Write checksum at created checksum-placeholder |
| frameEndIdx := wb.WriterIndex() |
| checksum := Crc32cCheckSum(wb.Get(metadataStartIdx, frameEndIdx-metadataStartIdx)) |
| |
| // Set Sizes and checksum in the fixed-size header |
| wb.PutUint32(frameEndIdx-frameStartIdx, frameSizeIdx) // External frame |
| wb.PutUint32(checksum, checksumIdx) |
| } |
| |
| // ConvertFromStringMap convert a string map to a KeyValue []byte |
| func ConvertFromStringMap(m map[string]string) []*pb.KeyValue { |
| list := make([]*pb.KeyValue, len(m)) |
| |
| i := 0 |
| for k, v := range m { |
| list[i] = &pb.KeyValue{ |
| Key: proto.String(k), |
| Value: proto.String(v), |
| } |
| |
| i++ |
| } |
| |
| return list |
| } |
| |
| // ConvertToStringMap convert a KeyValue []byte to string map |
| func ConvertToStringMap(pbb []*pb.KeyValue) map[string]string { |
| m := make(map[string]string) |
| |
| for _, kv := range pbb { |
| m[*kv.Key] = *kv.Value |
| } |
| |
| return m |
| } |