blob: 60c179d127aec6feb58924744ce55df68b7c29f2 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package internal
import (
"bufio"
"io"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
)
type connectionReader struct {
cnx *connection
buffer Buffer
reader *bufio.Reader
}
func newConnectionReader(cnx *connection) *connectionReader {
return &connectionReader{
cnx: cnx,
reader: bufio.NewReader(cnx.cnx),
buffer: NewBuffer(4096),
}
}
func (r *connectionReader) readFromConnection() {
for {
cmd, headersAndPayload, err := r.readSingleCommand()
if err != nil {
r.cnx.log.WithError(err).Info("Error reading from connection")
r.cnx.TriggerClose()
break
}
// Process
var payloadLen uint32
if headersAndPayload != nil {
payloadLen = headersAndPayload.ReadableBytes()
}
r.cnx.log.Debug("Got command! ", cmd, " with payload size: ", payloadLen)
r.cnx.receivedCommand(cmd, headersAndPayload)
}
}
func (r *connectionReader) readSingleCommand() (cmd *pb.BaseCommand, headersAndPayload Buffer, err error) {
// First, we need to read the frame size
if r.buffer.ReadableBytes() < 4 {
if r.buffer.ReadableBytes() == 0 {
// If the buffer is empty, just go back to write at the beginning
r.buffer.Clear()
}
if err := r.readAtLeast(4); err != nil {
return nil, nil, errors.Errorf("Short read when reading frame size: %s", err)
}
}
// We have enough to read frame size
frameSize := r.buffer.ReadUint32()
if frameSize > MaxFrameSize {
r.cnx.log.Warnf("Received too big frame size. size=%d", frameSize)
r.cnx.TriggerClose()
return nil, nil, errors.New("Frame size too big")
}
// Next, we read the rest of the frame
if r.buffer.ReadableBytes() < frameSize {
remainingBytes := frameSize - r.buffer.ReadableBytes()
if err := r.readAtLeast(remainingBytes); err != nil {
return nil, nil, errors.Errorf("Short read when reading frame: %s", err)
}
}
// We have now the complete frame
cmdSize := r.buffer.ReadUint32()
cmd, err = r.deserializeCmd(r.buffer.Read(cmdSize))
if err != nil {
return nil, nil, err
}
// Also read the eventual payload
headersAndPayloadSize := frameSize - (cmdSize + 4)
if cmdSize+4 < frameSize {
headersAndPayload = NewBuffer(int(headersAndPayloadSize))
headersAndPayload.Write(r.buffer.Read(headersAndPayloadSize))
}
return cmd, headersAndPayload, nil
}
func (r *connectionReader) readAtLeast(size uint32) error {
if r.buffer.WritableBytes() < size {
// There's not enough room in the current buffer to read the requested amount of data
totalFrameSize := r.buffer.ReadableBytes() + size
if r.buffer.ReadableBytes()+size > r.buffer.Capacity() {
// Resize to a bigger buffer to avoid continuous resizing
r.buffer.Resize(totalFrameSize * 2)
} else {
// Compact the buffer by moving the partial data to the beginning.
// This will have enough room for reading the remainder of the data
r.buffer.MoveToFront()
}
}
n, err := io.ReadAtLeast(r.cnx.cnx, r.buffer.WritableSlice(), int(size))
if err != nil {
r.cnx.TriggerClose()
return err
}
r.buffer.WrittenBytes(uint32(n))
return nil
}
func (r *connectionReader) deserializeCmd(data []byte) (*pb.BaseCommand, error) {
cmd := &pb.BaseCommand{}
err := proto.Unmarshal(data, cmd)
if err != nil {
r.cnx.log.WithError(err).Warn("Failed to parse protobuf command")
r.cnx.TriggerClose()
return nil, err
}
return cmd, nil
}