blob: 77bff5a4d5d32092a16222dd0966f3a876e41b3f [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Based on the ring buffer implementation in
// NodeBIO (https://github.com/joyent/node/blob/master/src/node_crypto_bio.h).
// The following is the license for that implementation:
// Copyright Joyent, Inc. and other Node contributors.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to permit
// persons to whom the Software is furnished to do so, subject to the
// following conditions:
//
// The above copyright notice and this permission notice shall be included
// in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.
#include "ring_buffer.hpp"
#include <assert.h>
#include <string.h>
using namespace datastax::internal::rb;
RingBuffer::~RingBuffer() {
Buffer* current = head_.next_;
while (current != &head_) {
Buffer* next = current->next_;
delete current;
current = next;
}
read_head_ = NULL;
write_head_ = NULL;
}
void RingBuffer::try_move_read_head() {
// `read_pos_` and `write_pos_` means the position of the reader and writer
// inside the buffer, respectively. When they're equal - its safe to reset
// them, because both reader and writer will continue doing their stuff
// from new (zero) positions.
while (read_head_->read_pos_ != 0 && read_head_->read_pos_ == read_head_->write_pos_) {
// Reset positions
read_head_->read_pos_ = 0;
read_head_->write_pos_ = 0;
// Move read_head_ forward, just in case if there're still some data to
// read in the next buffer.
if (read_head_ != write_head_) read_head_ = read_head_->next_;
}
}
size_t RingBuffer::read(char* out, size_t size) {
size_t bytes_read = 0;
size_t expected = length_ > size ? size : length_;
size_t offset = 0;
size_t left = size;
while (bytes_read < expected) {
assert(read_head_->read_pos_ <= read_head_->write_pos_);
size_t avail = read_head_->write_pos_ - read_head_->read_pos_;
if (avail > left) avail = left;
// Copy data
if (out != NULL) memcpy(out + offset, read_head_->data_ + read_head_->read_pos_, avail);
read_head_->read_pos_ += avail;
// Move pointers
bytes_read += avail;
offset += avail;
left -= avail;
try_move_read_head();
}
assert(expected == bytes_read);
length_ -= bytes_read;
// Free all empty buffers, but write_head's child
free_empty();
return bytes_read;
}
void RingBuffer::free_empty() {
Buffer* child = write_head_->next_;
if (child == write_head_ || child == read_head_) return;
Buffer* cur = child->next_;
if (cur == write_head_ || cur == read_head_) return;
Buffer* prev = child;
while (cur != read_head_) {
// Skip embedded buffer, and continue deallocating again starting from it
if (cur == &head_) {
prev->next_ = cur;
prev = cur;
cur = head_.next_;
continue;
}
assert(cur != write_head_);
assert(cur->write_pos_ == cur->read_pos_);
Buffer* next = cur->next_;
delete cur;
cur = next;
}
assert(prev == child || prev == &head_);
prev->next_ = cur;
}
size_t RingBuffer::index_of(char delim, size_t limit) {
size_t bytes_read = 0;
size_t max = length_ > limit ? limit : length_;
size_t left = limit;
Buffer* current = read_head_;
while (bytes_read < max) {
assert(current->read_pos_ <= current->write_pos_);
size_t avail = current->write_pos_ - current->read_pos_;
if (avail > left) avail = left;
// Walk through data
char* tmp = current->data_ + current->read_pos_;
size_t off = 0;
while (off < avail && *tmp != delim) {
off++;
tmp++;
}
// Move pointers
bytes_read += off;
left -= off;
// Found `delim`
if (off != avail) {
return bytes_read;
}
// Move to next buffer
if (current->read_pos_ + avail == BUFFER_LENGTH) {
current = current->next_;
}
}
assert(max == bytes_read);
return max;
}
void RingBuffer::write(const char* data, size_t size) {
size_t offset = 0;
size_t left = size;
while (left > 0) {
size_t to_write = left;
assert(write_head_->write_pos_ <= BUFFER_LENGTH);
size_t avail = BUFFER_LENGTH - write_head_->write_pos_;
if (to_write > avail) to_write = avail;
// Copy data
memcpy(write_head_->data_ + write_head_->write_pos_, data + offset, to_write);
// Move pointers
left -= to_write;
offset += to_write;
length_ += to_write;
write_head_->write_pos_ += to_write;
assert(write_head_->write_pos_ <= BUFFER_LENGTH);
// Go to next buffer if there still are some bytes to write
if (left != 0) {
assert(write_head_->write_pos_ == BUFFER_LENGTH);
try_allocate_for_write();
write_head_ = write_head_->next_;
// Additionally, since we're moved to the next buffer, read head
// may be moved as well.
try_move_read_head();
}
}
assert(left == 0);
}
char* RingBuffer::peek_writable(size_t* size) {
size_t available = BUFFER_LENGTH - write_head_->write_pos_;
if (*size != 0 && available > *size)
available = *size;
else
*size = available;
return write_head_->data_ + write_head_->write_pos_;
}
void RingBuffer::commit(size_t size) {
write_head_->write_pos_ += size;
length_ += size;
assert(write_head_->write_pos_ <= BUFFER_LENGTH);
// Allocate new buffer if write head is full,
// and there're no other place to go
try_allocate_for_write();
if (write_head_->write_pos_ == BUFFER_LENGTH) {
write_head_ = write_head_->next_;
// Additionally, since we're moved to the next buffer, read head
// may be moved as well.
try_move_read_head();
}
}
void RingBuffer::try_allocate_for_write() {
// If write head is full, next buffer is either read head or not empty.
if (write_head_->write_pos_ == BUFFER_LENGTH &&
(write_head_->next_ == read_head_ || write_head_->next_->write_pos_ != 0)) {
Buffer* next = new Buffer();
next->next_ = write_head_->next_;
write_head_->next_ = next;
}
}
void RingBuffer::reset() {
while (read_head_->read_pos_ != read_head_->write_pos_) {
assert(read_head_->write_pos_ > read_head_->read_pos_);
length_ -= read_head_->write_pos_ - read_head_->read_pos_;
read_head_->write_pos_ = 0;
read_head_->read_pos_ = 0;
read_head_ = read_head_->next_;
}
write_head_ = read_head_;
assert(length_ == 0);
}