blob: aa6b4f134a4bbbe2cee1315edc9b0fcea82b04f3 [file] [log] [blame]
/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2017 Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef _RDBUF_H
#define _RDBUF_H
#ifndef _MSC_VER
/* for struct iovec */
#include <sys/socket.h>
#include <sys/types.h>
#endif
#include "rdsysqueue.h"
/**
* @name Generic byte buffers
*
* @{
*
* A buffer is a list of segments, each segment having a memory pointer,
* write offset, and capacity.
*
* The main buffer and segment structure is tailored for append-writing
* or append-pushing foreign memory.
*
* Updates of previously written memory regions are possible through the
* use of write_update() that takes an absolute offset.
*
* The write position is part of the buffer and segment structures, while
* read is a separate object (rd_slice_t) that does not affect the buffer.
*/
/**
* @brief Buffer segment
*/
typedef struct rd_segment_s {
TAILQ_ENTRY(rd_segment_s) seg_link; /*<< rbuf_segments Link */
char *seg_p; /**< Backing-store memory */
size_t seg_of; /**< Current relative write-position
* (length of payload in this segment) */
size_t seg_size; /**< Allocated size of seg_p */
size_t seg_absof; /**< Absolute offset of this segment's
* beginning in the grand rd_buf_t */
void (*seg_free) (void *p); /**< Optional free function for seg_p */
int seg_flags; /**< Segment flags */
#define RD_SEGMENT_F_RDONLY 0x1 /**< Read-only segment */
#define RD_SEGMENT_F_FREE 0x2 /**< Free segment on destroy,
* e.g, not a fixed segment. */
} rd_segment_t;
TAILQ_HEAD(rd_segment_head,rd_segment_s);
/**
* @brief Buffer, containing a list of segments.
*/
typedef struct rd_buf_s {
struct rd_segment_head rbuf_segments; /**< TAILQ list of segments */
size_t rbuf_segment_cnt; /**< Number of segments */
rd_segment_t *rbuf_wpos; /**< Current write position seg */
size_t rbuf_len; /**< Current (written) length */
size_t rbuf_size; /**< Total allocated size of
* all segments. */
char *rbuf_extra; /* Extra memory allocated for
* use by segment structs,
* buffer memory, etc. */
size_t rbuf_extra_len; /* Current extra memory used */
size_t rbuf_extra_size; /* Total size of extra memory */
} rd_buf_t;
/**
* @brief A read-only slice of a buffer.
*/
typedef struct rd_slice_s {
const rd_buf_t *buf; /**< Pointer to buffer */
const rd_segment_t *seg; /**< Current read position segment.
* Will point to NULL when end of
* slice is reached. */
size_t rof; /**< Relative read offset in segment */
size_t start; /**< Slice start offset in buffer */
size_t end; /**< Slice end offset in buffer+1 */
} rd_slice_t;
/**
* @returns the current write position (absolute offset)
*/
static RD_INLINE RD_UNUSED size_t rd_buf_write_pos (const rd_buf_t *rbuf) {
const rd_segment_t *seg = rbuf->rbuf_wpos;
if (unlikely(!seg)) {
#if ENABLE_DEVEL
rd_assert(rbuf->rbuf_len == 0);
#endif
return 0;
}
#if ENABLE_DEVEL
rd_assert(seg->seg_absof + seg->seg_of == rbuf->rbuf_len);
#endif
return seg->seg_absof + seg->seg_of;
}
/**
* @returns the number of bytes available for writing (before growing).
*/
static RD_INLINE RD_UNUSED size_t rd_buf_write_remains (const rd_buf_t *rbuf) {
return rbuf->rbuf_size - rbuf->rbuf_len;
}
/**
* @returns the number of bytes remaining to write to the given segment,
* and sets the \p *p pointer (unless NULL) to the start of
* the contiguous memory.
*/
static RD_INLINE RD_UNUSED size_t
rd_segment_write_remains (const rd_segment_t *seg, void **p) {
if (unlikely((seg->seg_flags & RD_SEGMENT_F_RDONLY)))
return 0;
if (p)
*p = (void *)(seg->seg_p + seg->seg_of);
return seg->seg_size - seg->seg_of;
}
/**
* @returns the last segment for the buffer.
*/
static RD_INLINE RD_UNUSED rd_segment_t *rd_buf_last (const rd_buf_t *rbuf) {
return TAILQ_LAST(&rbuf->rbuf_segments, rd_segment_head);
}
/**
* @returns the total written buffer length
*/
static RD_INLINE RD_UNUSED size_t rd_buf_len (const rd_buf_t *rbuf) {
return rbuf->rbuf_len;
}
int rd_buf_write_seek (rd_buf_t *rbuf, size_t absof);
size_t rd_buf_write (rd_buf_t *rbuf, const void *payload, size_t size);
size_t rd_buf_write_slice (rd_buf_t *rbuf, rd_slice_t *slice);
size_t rd_buf_write_update (rd_buf_t *rbuf, size_t absof,
const void *payload, size_t size);
void rd_buf_push (rd_buf_t *rbuf, const void *payload, size_t size,
void (*free_cb)(void *));
size_t rd_buf_get_writable (rd_buf_t *rbuf, void **p);
void rd_buf_write_ensure_contig (rd_buf_t *rbuf, size_t size);
void rd_buf_write_ensure (rd_buf_t *rbuf, size_t min_size, size_t max_size);
size_t rd_buf_get_write_iov (const rd_buf_t *rbuf,
struct iovec *iovs, size_t *iovcntp,
size_t iov_max, size_t size_max);
void rd_buf_init (rd_buf_t *rbuf, size_t fixed_seg_cnt, size_t buf_size);
void rd_buf_destroy (rd_buf_t *rbuf);
void rd_buf_dump (const rd_buf_t *rbuf, int do_hexdump);
int unittest_rdbuf (void);
/**@}*/
/**
* @name Buffer read operates on slices of an rd_buf_t and does not
* modify the underlying itself.
*
* @warning A slice will not be valid/safe after the buffer or
* segments have been modified by a buf write operation
* (write, update, write_seek, etc).
* @{
*/
/**
* @returns the remaining length in the slice
*/
#define rd_slice_remains(slice) ((slice)->end - rd_slice_abs_offset(slice))
/**
* @returns the total size of the slice, regardless of current position.
*/
#define rd_slice_size(slice) ((slice)->end - (slice)->start)
/**
* @returns the read position in the slice as a new slice.
*/
static RD_INLINE RD_UNUSED rd_slice_t rd_slice_pos (const rd_slice_t *slice) {
rd_slice_t newslice = *slice;
if (!slice->seg)
return newslice;
newslice.start = slice->seg->seg_absof + slice->rof;
return newslice;
}
/**
* @returns the read position as an absolute buffer byte offset.
* @remark this is the buffer offset, not the slice's local offset.
*/
static RD_INLINE RD_UNUSED size_t
rd_slice_abs_offset (const rd_slice_t *slice) {
if (unlikely(!slice->seg)) /* reader has reached the end */
return slice->end;
return slice->seg->seg_absof + slice->rof;
}
/**
* @returns the read position as a byte offset.
* @remark this is the slice-local offset, not the backing buffer's offset.
*/
static RD_INLINE RD_UNUSED size_t rd_slice_offset (const rd_slice_t *slice) {
if (unlikely(!slice->seg)) /* reader has reached the end */
return rd_slice_size(slice);
return (slice->seg->seg_absof + slice->rof) - slice->start;
}
int rd_slice_init_seg (rd_slice_t *slice, const rd_buf_t *rbuf,
const rd_segment_t *seg, size_t rof, size_t size);
int rd_slice_init (rd_slice_t *slice, const rd_buf_t *rbuf,
size_t absof, size_t size);
void rd_slice_init_full (rd_slice_t *slice, const rd_buf_t *rbuf);
size_t rd_slice_reader (rd_slice_t *slice, const void **p);
size_t rd_slice_peeker (const rd_slice_t *slice, const void **p);
size_t rd_slice_read (rd_slice_t *slice, void *dst, size_t size);
size_t rd_slice_peek (const rd_slice_t *slice, size_t offset,
void *dst, size_t size);
size_t rd_slice_read_varint (rd_slice_t *slice, size_t *nump);
const void *rd_slice_ensure_contig (rd_slice_t *slice, size_t size);
int rd_slice_seek (rd_slice_t *slice, size_t offset);
size_t rd_slice_get_iov (const rd_slice_t *slice,
struct iovec *iovs, size_t *iovcntp,
size_t iov_max, size_t size_max);
uint32_t rd_slice_crc32 (rd_slice_t *slice);
uint32_t rd_slice_crc32c (rd_slice_t *slice);
int rd_slice_narrow (rd_slice_t *slice, rd_slice_t *save_slice, size_t size)
RD_WARN_UNUSED_RESULT;
int rd_slice_narrow_relative (rd_slice_t *slice, rd_slice_t *save_slice,
size_t relsize)
RD_WARN_UNUSED_RESULT;
void rd_slice_widen (rd_slice_t *slice, const rd_slice_t *save_slice);
int rd_slice_narrow_copy (const rd_slice_t *orig, rd_slice_t *new_slice,
size_t size)
RD_WARN_UNUSED_RESULT;
int rd_slice_narrow_copy_relative (const rd_slice_t *orig,
rd_slice_t *new_slice,
size_t relsize)
RD_WARN_UNUSED_RESULT;
void rd_slice_dump (const rd_slice_t *slice, int do_hexdump);
/**@}*/
#endif /* _RDBUF_H */