| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| package internal |
| |
| import ( |
| "bufio" |
| "io" |
| |
| pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" |
| "github.com/gogo/protobuf/proto" |
| "github.com/pkg/errors" |
| ) |
| |
| type connectionReader struct { |
| cnx *connection |
| buffer Buffer |
| reader *bufio.Reader |
| } |
| |
| func newConnectionReader(cnx *connection) *connectionReader { |
| return &connectionReader{ |
| cnx: cnx, |
| reader: bufio.NewReader(cnx.cnx), |
| buffer: NewBuffer(4096), |
| } |
| } |
| |
| func (r *connectionReader) readFromConnection() { |
| for { |
| cmd, headersAndPayload, err := r.readSingleCommand() |
| if err != nil { |
| r.cnx.log.WithError(err).Info("Error reading from connection") |
| r.cnx.TriggerClose() |
| break |
| } |
| |
| // Process |
| var payloadLen uint32 |
| if headersAndPayload != nil { |
| payloadLen = headersAndPayload.ReadableBytes() |
| } |
| r.cnx.log.Debug("Got command! ", cmd, " with payload size: ", payloadLen) |
| r.cnx.receivedCommand(cmd, headersAndPayload) |
| } |
| } |
| |
| func (r *connectionReader) readSingleCommand() (cmd *pb.BaseCommand, headersAndPayload Buffer, err error) { |
| // First, we need to read the frame size |
| if r.buffer.ReadableBytes() < 4 { |
| if r.buffer.ReadableBytes() == 0 { |
| // If the buffer is empty, just go back to write at the beginning |
| r.buffer.Clear() |
| } |
| if err := r.readAtLeast(4); err != nil { |
| return nil, nil, errors.Errorf("Short read when reading frame size: %s", err) |
| } |
| } |
| |
| // We have enough to read frame size |
| frameSize := r.buffer.ReadUint32() |
| if r.cnx.maxMessageSize != 0 && int32(frameSize) > (r.cnx.maxMessageSize+MessageFramePadding) { |
| r.cnx.log.Warnf("Received too big frame size. size=%d", frameSize) |
| r.cnx.TriggerClose() |
| return nil, nil, errors.New("Frame size too big") |
| } |
| |
| // Next, we read the rest of the frame |
| if r.buffer.ReadableBytes() < frameSize { |
| remainingBytes := frameSize - r.buffer.ReadableBytes() |
| if err := r.readAtLeast(remainingBytes); err != nil { |
| return nil, nil, errors.Errorf("Short read when reading frame: %s", err) |
| } |
| } |
| |
| // We have now the complete frame |
| cmdSize := r.buffer.ReadUint32() |
| cmd, err = r.deserializeCmd(r.buffer.Read(cmdSize)) |
| if err != nil { |
| return nil, nil, err |
| } |
| |
| // Also read the eventual payload |
| headersAndPayloadSize := frameSize - (cmdSize + 4) |
| if cmdSize+4 < frameSize { |
| headersAndPayload = NewBuffer(int(headersAndPayloadSize)) |
| headersAndPayload.Write(r.buffer.Read(headersAndPayloadSize)) |
| } |
| return cmd, headersAndPayload, nil |
| } |
| |
| func (r *connectionReader) readAtLeast(size uint32) error { |
| if r.buffer.WritableBytes() < size { |
| // There's not enough room in the current buffer to read the requested amount of data |
| totalFrameSize := r.buffer.ReadableBytes() + size |
| if r.buffer.ReadableBytes()+size > r.buffer.Capacity() { |
| // Resize to a bigger buffer to avoid continuous resizing |
| r.buffer.Resize(totalFrameSize * 2) |
| } else { |
| // Compact the buffer by moving the partial data to the beginning. |
| // This will have enough room for reading the remainder of the data |
| r.buffer.MoveToFront() |
| } |
| } |
| |
| n, err := io.ReadAtLeast(r.cnx.cnx, r.buffer.WritableSlice(), int(size)) |
| if err != nil { |
| r.cnx.TriggerClose() |
| return err |
| } |
| |
| r.buffer.WrittenBytes(uint32(n)) |
| return nil |
| } |
| |
| func (r *connectionReader) deserializeCmd(data []byte) (*pb.BaseCommand, error) { |
| cmd := &pb.BaseCommand{} |
| err := proto.Unmarshal(data, cmd) |
| if err != nil { |
| r.cnx.log.WithError(err).Warn("Failed to parse protobuf command") |
| r.cnx.TriggerClose() |
| return nil, err |
| } |
| return cmd, nil |
| } |