blob: 2d5a65c75c05d747f3e154fbaab298a64a12d310 [file] [log] [blame]
/* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
use super::memory_pool::{BytesMutExt, memory_pool};
use bytes::{Buf, BufMut, BytesMut};
use compio::buf::{IoBuf, IoBufMut, SetBufInit};
use std::ops::{Deref, DerefMut};
/// A buffer wrapper that participates in memory pooling.
///
/// This buffer automatically acquires memory from the global memory pool
/// and returns it when dropped. It also tracks resize events to keep
/// pool accounting accurate.
#[derive(Debug)]
pub struct PooledBuffer {
from_pool: bool,
original_capacity: usize,
original_bucket_idx: Option<usize>,
inner: BytesMut,
}
impl Default for PooledBuffer {
fn default() -> Self {
Self::empty()
}
}
impl PooledBuffer {
/// Creates a new pooled buffer with the specified capacity.
///
/// # Arguments
///
/// * `capacity` - The capacity of the buffer
pub fn with_capacity(capacity: usize) -> Self {
let (buffer, was_pool_allocated) = memory_pool().acquire_buffer(capacity);
let original_capacity = buffer.capacity();
let original_bucket_idx = if was_pool_allocated {
memory_pool().best_fit(original_capacity)
} else {
None
};
Self {
from_pool: was_pool_allocated,
original_capacity,
original_bucket_idx,
inner: buffer,
}
}
/// Creates a new pooled buffer from an existing `BytesMut`.
///
/// # Arguments
///
/// * `existing` - The existing `BytesMut` buffer
pub fn from_existing(existing: BytesMut) -> Self {
Self {
from_pool: false,
original_capacity: existing.capacity(),
original_bucket_idx: None,
inner: existing,
}
}
/// Creates an empty pooled buffer.
pub fn empty() -> Self {
Self {
from_pool: false,
original_capacity: 0,
original_bucket_idx: None,
inner: BytesMut::new(),
}
}
/// Checks if the buffer needs to be resized and updates the memory pool accordingly.
/// This shall be called after operations that might cause a resize.
pub fn check_for_resize(&mut self) {
if !self.from_pool {
return;
}
let current_capacity = self.inner.capacity();
if current_capacity != self.original_capacity {
memory_pool().inc_resize_events();
if let Some(orig_idx) = self.original_bucket_idx {
memory_pool().dec_bucket_in_use(orig_idx);
if let Some(new_idx) = memory_pool().best_fit(current_capacity) {
// Track as a new allocation in the new bucket
memory_pool().inc_bucket_alloc(new_idx);
memory_pool().inc_bucket_in_use(new_idx);
self.original_bucket_idx = Some(new_idx);
} else {
// Track as an external allocation if no bucket fits
memory_pool().inc_external_allocations();
self.original_bucket_idx = None;
}
}
self.original_capacity = current_capacity;
}
}
/// Wrapper for reserve which might cause resize
pub fn reserve(&mut self, additional: usize) {
let before_cap = self.inner.capacity();
self.inner.reserve(additional);
if self.inner.capacity() != before_cap {
self.check_for_resize();
}
}
/// Wrapper for extend_from_slice which might cause resize
pub fn extend_from_slice(&mut self, extend_from: &[u8]) {
let before_cap = self.inner.capacity();
self.inner.extend_from_slice(extend_from);
if self.inner.capacity() != before_cap {
self.check_for_resize();
}
}
/// Wrapper for put_bytes which might cause resize
pub fn put_bytes(&mut self, byte: u8, len: usize) {
let before_cap = self.inner.capacity();
self.inner.put_bytes(byte, len);
if self.inner.capacity() != before_cap {
self.check_for_resize();
}
}
/// Wrapper for put_slice which might cause resize
pub fn put_slice(&mut self, src: &[u8]) {
let before_cap = self.inner.capacity();
self.inner.put_slice(src);
if self.inner.capacity() != before_cap {
self.check_for_resize();
}
}
/// Wrapper for put_u32_le which might cause resize
pub fn put_u32_le(&mut self, value: u32) {
let before_cap = self.inner.capacity();
self.inner.put_u32_le(value);
if self.inner.capacity() != before_cap {
self.check_for_resize();
}
}
/// Wrapper for put_u64_le which might cause resize
pub fn put_u64_le(&mut self, value: u64) {
let before_cap = self.inner.capacity();
self.inner.put_u64_le(value);
if self.inner.capacity() != before_cap {
self.check_for_resize();
}
}
/// Returns the capacity of the inner buffer
pub fn capacity(&self) -> usize {
self.inner.capacity()
}
/// Returns the length of the inner buffer
pub fn len(&self) -> usize {
self.inner.len()
}
/// Returns true if the buffer is empty
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
/// Consumes the PooledBuffer and returns the inner BytesMut.
/// Note: This bypasses pool return logic, use with caution.
pub fn into_inner(self) -> BytesMut {
let mut this = std::mem::ManuallyDrop::new(self);
std::mem::take(&mut this.inner)
}
}
impl Deref for PooledBuffer {
type Target = BytesMut;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl DerefMut for PooledBuffer {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}
impl Drop for PooledBuffer {
fn drop(&mut self) {
if self.from_pool {
let buf = std::mem::take(&mut self.inner);
buf.return_to_pool(self.original_capacity, true);
}
}
}
impl From<&[u8]> for PooledBuffer {
fn from(slice: &[u8]) -> Self {
let mut buf = PooledBuffer::with_capacity(slice.len());
buf.inner.extend_from_slice(slice);
buf
}
}
impl From<BytesMut> for PooledBuffer {
fn from(bytes: BytesMut) -> Self {
Self::from_existing(bytes)
}
}
impl Buf for PooledBuffer {
fn remaining(&self) -> usize {
self.inner.remaining()
}
fn chunk(&self) -> &[u8] {
self.inner.chunk()
}
fn advance(&mut self, cnt: usize) {
self.inner.advance(cnt)
}
fn chunks_vectored<'t>(&'t self, dst: &mut [std::io::IoSlice<'t>]) -> usize {
self.inner.chunks_vectored(dst)
}
}
impl SetBufInit for PooledBuffer {
unsafe fn set_buf_init(&mut self, len: usize) {
if self.inner.len() <= len {
unsafe {
self.inner.set_len(len);
}
}
}
}
unsafe impl IoBufMut for PooledBuffer {
fn as_buf_mut_ptr(&mut self) -> *mut u8 {
self.inner.as_mut_ptr()
}
}
unsafe impl IoBuf for PooledBuffer {
fn as_buf_ptr(&self) -> *const u8 {
self.inner.as_buf_ptr()
}
fn buf_len(&self) -> usize {
self.inner.len()
}
fn buf_capacity(&self) -> usize {
self.inner.capacity()
}
}