blob: b44ce5943257048e6dc25ce44073d25ab2ddb651 [file] [log] [blame]
/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2017 Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "rd.h"
#include "rdbuf.h"
#include "rdunittest.h"
#include "rdlog.h"
#include "rdcrc32.h"
#include "crc32c.h"
static size_t
rd_buf_get_writable0 (rd_buf_t *rbuf, rd_segment_t **segp, void **p);
/**
* @brief Destroy the segment and free its payload.
*
* @remark Will NOT unlink from buffer.
*/
static void rd_segment_destroy (rd_segment_t *seg) {
/* Free payload */
if (seg->seg_free && seg->seg_p)
seg->seg_free(seg->seg_p);
if (seg->seg_flags & RD_SEGMENT_F_FREE)
rd_free(seg);
}
/**
* @brief Initialize segment with absolute offset, backing memory pointer,
* and backing memory size.
* @remark The segment is NOT linked.
*/
static void rd_segment_init (rd_segment_t *seg, void *mem, size_t size) {
memset(seg, 0, sizeof(*seg));
seg->seg_p = mem;
seg->seg_size = size;
}
/**
* @brief Append segment to buffer
*
* @remark Will set the buffer position to the new \p seg if no existing wpos.
* @remark Will set the segment seg_absof to the current length of the buffer.
*/
static rd_segment_t *rd_buf_append_segment (rd_buf_t *rbuf, rd_segment_t *seg) {
TAILQ_INSERT_TAIL(&rbuf->rbuf_segments, seg, seg_link);
rbuf->rbuf_segment_cnt++;
seg->seg_absof = rbuf->rbuf_len;
rbuf->rbuf_len += seg->seg_of;
rbuf->rbuf_size += seg->seg_size;
/* Update writable position */
if (!rbuf->rbuf_wpos)
rbuf->rbuf_wpos = seg;
else
rd_buf_get_writable0(rbuf, NULL, NULL);
return seg;
}
/**
* @brief Attempt to allocate \p size bytes from the buffers extra buffers.
* @returns the allocated pointer which MUST NOT be freed, or NULL if
* not enough memory.
* @remark the returned pointer is memory-aligned to be safe.
*/
static void *extra_alloc (rd_buf_t *rbuf, size_t size) {
size_t of = RD_ROUNDUP(rbuf->rbuf_extra_len, 8); /* FIXME: 32-bit */
void *p;
if (of + size > rbuf->rbuf_extra_size)
return NULL;
p = rbuf->rbuf_extra + of; /* Aligned pointer */
rbuf->rbuf_extra_len = of + size;
return p;
}
/**
* @brief Get a pre-allocated segment if available, or allocate a new
* segment with the extra amount of \p size bytes allocated for payload.
*
* Will not append the segment to the buffer.
*/
static rd_segment_t *
rd_buf_alloc_segment0 (rd_buf_t *rbuf, size_t size) {
rd_segment_t *seg;
/* See if there is enough room in the extra buffer for
* allocating the segment header and the buffer,
* or just the segment header, else fall back to malloc. */
if ((seg = extra_alloc(rbuf, sizeof(*seg) + size))) {
rd_segment_init(seg, size > 0 ? seg+1 : NULL, size);
} else if ((seg = extra_alloc(rbuf, sizeof(*seg)))) {
rd_segment_init(seg, size > 0 ? rd_malloc(size) : NULL, size);
if (size > 0)
seg->seg_free = rd_free;
} else if ((seg = rd_malloc(sizeof(*seg) + size))) {
rd_segment_init(seg, size > 0 ? seg+1 : NULL, size);
seg->seg_flags |= RD_SEGMENT_F_FREE;
} else
rd_assert(!*"segment allocation failure");
return seg;
}
/**
* @brief Allocate between \p min_size .. \p max_size of backing memory
* and add it as a new segment to the buffer.
*
* The buffer position is updated to point to the new segment.
*
* The segment will be over-allocated if permitted by max_size
* (max_size == 0 or max_size > min_size).
*/
static rd_segment_t *
rd_buf_alloc_segment (rd_buf_t *rbuf, size_t min_size, size_t max_size) {
rd_segment_t *seg;
/* Over-allocate if allowed. */
if (min_size != max_size || max_size == 0)
max_size = RD_MAX(sizeof(*seg) * 4,
RD_MAX(min_size * 2,
rbuf->rbuf_size / 2));
seg = rd_buf_alloc_segment0(rbuf, max_size);
rd_buf_append_segment(rbuf, seg);
return seg;
}
/**
* @brief Ensures that \p size bytes will be available
* for writing and the position will be updated to point to the
* start of this contiguous block.
*/
void rd_buf_write_ensure_contig (rd_buf_t *rbuf, size_t size) {
rd_segment_t *seg = rbuf->rbuf_wpos;
if (seg) {
void *p;
size_t remains = rd_segment_write_remains(seg, &p);
if (remains >= size)
return; /* Existing segment has enough space. */
/* Future optimization:
* If existing segment has enough remaining space to warrant
* a split, do it, before allocating a new one. */
}
/* Allocate new segment */
rbuf->rbuf_wpos = rd_buf_alloc_segment(rbuf, size, size);
}
/**
* @brief Ensures that at least \p size bytes will be available for
* a future write.
*
* Typically used prior to a call to rd_buf_get_write_iov()
*/
void rd_buf_write_ensure (rd_buf_t *rbuf, size_t min_size, size_t max_size) {
size_t remains;
while ((remains = rd_buf_write_remains(rbuf)) < min_size)
rd_buf_alloc_segment(rbuf,
min_size - remains,
max_size ? max_size - remains : 0);
}
/**
* @returns the segment at absolute offset \p absof, or NULL if out of range.
*
* @remark \p hint is an optional segment where to start looking, such as
* the current write or read position.
*/
rd_segment_t *
rd_buf_get_segment_at_offset (const rd_buf_t *rbuf, const rd_segment_t *hint,
size_t absof) {
const rd_segment_t *seg = hint;
if (unlikely(absof > rbuf->rbuf_len))
return NULL;
/* Only use current write position if possible and if it helps */
if (!seg || absof < seg->seg_absof)
seg = TAILQ_FIRST(&rbuf->rbuf_segments);
do {
if (absof >= seg->seg_absof &&
absof < seg->seg_absof + seg->seg_of) {
rd_dassert(seg->seg_absof <= rd_buf_len(rbuf));
return (rd_segment_t *)seg;
}
} while ((seg = TAILQ_NEXT(seg, seg_link)));
return NULL;
}
/**
* @brief Split segment \p seg at absolute offset \p absof, appending
* a new segment after \p seg with its memory pointing to the
* memory starting at \p absof.
* \p seg 's memory will be shorted to the \p absof.
*
* The new segment is NOT appended to the buffer.
*
* @warning MUST ONLY be used on the LAST segment
*
* @warning if a segment is inserted between these two splitted parts
* it is imperative that the later segment's absof is corrected.
*
* @remark The seg_free callback is retained on the original \p seg
* and is not copied to the new segment, but flags are copied.
*/
static rd_segment_t *rd_segment_split (rd_buf_t *rbuf, rd_segment_t *seg,
size_t absof) {
rd_segment_t *newseg;
size_t relof;
rd_assert(seg == rbuf->rbuf_wpos);
rd_assert(absof >= seg->seg_absof &&
absof <= seg->seg_absof + seg->seg_of);
relof = absof - seg->seg_absof;
newseg = rd_buf_alloc_segment0(rbuf, 0);
/* Add later part of split bytes to new segment */
newseg->seg_p = seg->seg_p+relof;
newseg->seg_of = seg->seg_of-relof;
newseg->seg_size = seg->seg_size-relof;
newseg->seg_absof = SIZE_MAX; /* Invalid */
newseg->seg_flags |= seg->seg_flags;
/* Remove earlier part of split bytes from previous segment */
seg->seg_of = relof;
seg->seg_size = relof;
/* newseg's length will be added to rbuf_len in append_segment(),
* so shave it off here from seg's perspective. */
rbuf->rbuf_len -= newseg->seg_of;
rbuf->rbuf_size -= newseg->seg_size;
return newseg;
}
/**
* @brief Unlink and destroy a segment, updating the \p rbuf
* with the decrease in length and capacity.
*/
static void rd_buf_destroy_segment (rd_buf_t *rbuf, rd_segment_t *seg) {
rd_assert(rbuf->rbuf_segment_cnt > 0 &&
rbuf->rbuf_len >= seg->seg_of &&
rbuf->rbuf_size >= seg->seg_size);
TAILQ_REMOVE(&rbuf->rbuf_segments, seg, seg_link);
rbuf->rbuf_segment_cnt--;
rbuf->rbuf_len -= seg->seg_of;
rbuf->rbuf_size -= seg->seg_size;
rd_dassert(rbuf->rbuf_len <= seg->seg_absof);
if (rbuf->rbuf_wpos == seg)
rbuf->rbuf_wpos = NULL;
rd_segment_destroy(seg);
}
/**
* @brief Free memory associated with the \p rbuf, but not the rbuf itself.
* Segments will be destroyed.
*/
void rd_buf_destroy (rd_buf_t *rbuf) {
rd_segment_t *seg, *tmp;
#if ENABLE_DEVEL
/* FIXME */
if (rbuf->rbuf_len > 0 && 0) {
size_t overalloc = rbuf->rbuf_size - rbuf->rbuf_len;
float fill_grade = (float)rbuf->rbuf_len /
(float)rbuf->rbuf_size;
printf("fill grade: %.2f%% (%zu bytes over-allocated)\n",
fill_grade * 100.0f, overalloc);
}
#endif
TAILQ_FOREACH_SAFE(seg, &rbuf->rbuf_segments, seg_link, tmp) {
rd_segment_destroy(seg);
}
if (rbuf->rbuf_extra)
rd_free(rbuf->rbuf_extra);
}
/**
* @brief Initialize buffer, pre-allocating \p fixed_seg_cnt segments
* where the first segment will have a \p buf_size of backing memory.
*
* The caller may rearrange the backing memory as it see fits.
*/
void rd_buf_init (rd_buf_t *rbuf, size_t fixed_seg_cnt, size_t buf_size) {
size_t totalloc = 0;
memset(rbuf, 0, sizeof(*rbuf));
TAILQ_INIT(&rbuf->rbuf_segments);
if (!fixed_seg_cnt) {
assert(!buf_size);
return;
}
/* Pre-allocate memory for a fixed set of segments that are known
* before-hand, to minimize the number of extra allocations
* needed for well-known layouts (such as headers, etc) */
totalloc += RD_ROUNDUP(sizeof(rd_segment_t), 8) * fixed_seg_cnt;
/* Pre-allocate extra space for the backing buffer. */
totalloc += buf_size;
rbuf->rbuf_extra_size = totalloc;
rbuf->rbuf_extra = rd_malloc(rbuf->rbuf_extra_size);
}
/**
* @brief Convenience writer iterator interface.
*
* After writing to \p p the caller must update the written length
* by calling rd_buf_write(rbuf, NULL, written_length)
*
* @returns the number of contiguous writable bytes in segment
* and sets \p *p to point to the start of the memory region.
*/
static size_t
rd_buf_get_writable0 (rd_buf_t *rbuf, rd_segment_t **segp, void **p) {
rd_segment_t *seg;
for (seg = rbuf->rbuf_wpos ; seg ; seg = TAILQ_NEXT(seg, seg_link)) {
size_t len = rd_segment_write_remains(seg, p);
/* Even though the write offset hasn't changed we
* avoid future segment scans by adjusting the
* wpos here to the first writable segment. */
rbuf->rbuf_wpos = seg;
if (segp)
*segp = seg;
if (unlikely(len == 0))
continue;
/* Also adjust absof if the segment was allocated
* before the previous segment's memory was exhausted
* and thus now might have a lower absolute offset
* than the previos segment's now higher relative offset. */
if (seg->seg_of == 0 && seg->seg_absof < rbuf->rbuf_len)
seg->seg_absof = rbuf->rbuf_len;
return len;
}
return 0;
}
size_t rd_buf_get_writable (rd_buf_t *rbuf, void **p) {
rd_segment_t *seg;
return rd_buf_get_writable0(rbuf, &seg, p);
}
/**
* @brief Write \p payload of \p size bytes to current position
* in buffer. A new segment will be allocated and appended
* if needed.
*
* @returns the write position where payload was written (pre-write).
* Returning the pre-positition allows write_update() to later
* update the same location, effectively making write()s
* also a place-holder mechanism.
*
* @remark If \p payload is NULL only the write position is updated,
* in this mode it is required for the buffer to have enough
* memory for the NULL write (as it would otherwise cause
* uninitialized memory in any new segments allocated from this
* function).
*/
size_t rd_buf_write (rd_buf_t *rbuf, const void *payload, size_t size) {
size_t remains = size;
size_t initial_absof;
const char *psrc = (const char *)payload;
initial_absof = rbuf->rbuf_len;
/* Ensure enough space by pre-allocating segments. */
rd_buf_write_ensure(rbuf, size, 0);
while (remains > 0) {
void *p;
rd_segment_t *seg;
size_t segremains = rd_buf_get_writable0(rbuf, &seg, &p);
size_t wlen = RD_MIN(remains, segremains);
rd_dassert(seg == rbuf->rbuf_wpos);
rd_dassert(wlen > 0);
rd_dassert(seg->seg_p+seg->seg_of <= (char *)p &&
(char *)p < seg->seg_p+seg->seg_size);
if (payload) {
memcpy(p, psrc, wlen);
psrc += wlen;
}
seg->seg_of += wlen;
rbuf->rbuf_len += wlen;
remains -= wlen;
}
rd_assert(remains == 0);
return initial_absof;
}
/**
* @brief Write \p slice to \p rbuf
*
* @remark The slice position will be updated.
*
* @returns the number of bytes witten (always slice length)
*/
size_t rd_buf_write_slice (rd_buf_t *rbuf, rd_slice_t *slice) {
const void *p;
size_t rlen;
size_t sum = 0;
while ((rlen = rd_slice_reader(slice, &p))) {
size_t r;
r = rd_buf_write(rbuf, p, rlen);
rd_dassert(r != 0);
sum += r;
}
return sum;
}
/**
* @brief Write \p payload of \p size at absolute offset \p absof
* WITHOUT updating the total buffer length.
*
* This is used to update a previously written region, such
* as updating the header length.
*
* @returns the number of bytes written, which may be less than \p size
* if the update spans multiple segments.
*/
static size_t rd_segment_write_update (rd_segment_t *seg, size_t absof,
const void *payload, size_t size) {
size_t relof;
size_t wlen;
rd_dassert(absof >= seg->seg_absof);
relof = absof - seg->seg_absof;
rd_assert(relof <= seg->seg_of);
wlen = RD_MIN(size, seg->seg_of - relof);
rd_dassert(relof + wlen <= seg->seg_of);
memcpy(seg->seg_p+relof, payload, wlen);
return wlen;
}
/**
* @brief Write \p payload of \p size at absolute offset \p absof
* WITHOUT updating the total buffer length.
*
* This is used to update a previously written region, such
* as updating the header length.
*/
size_t rd_buf_write_update (rd_buf_t *rbuf, size_t absof,
const void *payload, size_t size) {
rd_segment_t *seg;
const char *psrc = (const char *)payload;
size_t of;
/* Find segment for offset */
seg = rd_buf_get_segment_at_offset(rbuf, rbuf->rbuf_wpos, absof);
rd_assert(seg && *"invalid absolute offset");
for (of = 0 ; of < size ; seg = TAILQ_NEXT(seg, seg_link)) {
rd_assert(seg->seg_absof <= rd_buf_len(rbuf));
size_t wlen = rd_segment_write_update(seg, absof+of,
psrc+of, size-of);
of += wlen;
}
rd_dassert(of == size);
return of;
}
/**
* @brief Push reference memory segment to current write position.
*/
void rd_buf_push (rd_buf_t *rbuf, const void *payload, size_t size,
void (*free_cb)(void *)) {
rd_segment_t *prevseg, *seg, *tailseg = NULL;
if ((prevseg = rbuf->rbuf_wpos) &&
rd_segment_write_remains(prevseg, NULL) > 0) {
/* If the current segment still has room in it split it
* and insert the pushed segment in the middle (below). */
tailseg = rd_segment_split(rbuf, prevseg,
prevseg->seg_absof +
prevseg->seg_of);
}
seg = rd_buf_alloc_segment0(rbuf, 0);
seg->seg_p = (char *)payload;
seg->seg_size = size;
seg->seg_of = size;
seg->seg_free = free_cb;
seg->seg_flags |= RD_SEGMENT_F_RDONLY;
rd_buf_append_segment(rbuf, seg);
if (tailseg)
rd_buf_append_segment(rbuf, tailseg);
}
/**
* @brief Do a write-seek, updating the write position to the given
* absolute \p absof.
*
* @warning Any sub-sequent segments will be destroyed.
*
* @returns -1 if the offset is out of bounds, else 0.
*/
int rd_buf_write_seek (rd_buf_t *rbuf, size_t absof) {
rd_segment_t *seg, *next;
size_t relof;
seg = rd_buf_get_segment_at_offset(rbuf, rbuf->rbuf_wpos, absof);
if (unlikely(!seg))
return -1;
relof = absof - seg->seg_absof;
if (unlikely(relof > seg->seg_of))
return -1;
/* Destroy sub-sequent segments in reverse order so that
* destroy_segment() length checks are correct.
* Will decrement rbuf_len et.al. */
for (next = TAILQ_LAST(&rbuf->rbuf_segments, rd_segment_head) ;
next != seg ; next = TAILQ_PREV(next, rd_segment_head, seg_link))
rd_buf_destroy_segment(rbuf, next);
/* Update relative write offset */
seg->seg_of = relof;
rbuf->rbuf_wpos = seg;
rbuf->rbuf_len = seg->seg_absof + seg->seg_of;
rd_assert(rbuf->rbuf_len == absof);
return 0;
}
/**
* @brief Set up the iovecs in \p iovs (of size \p iov_max) with the writable
* segments from the buffer's current write position.
*
* @param iovcntp will be set to the number of populated \p iovs[]
* @param size_max limits the total number of bytes made available.
* Note: this value may be overshot with the size of one
* segment.
*
* @returns the total number of bytes in the represented segments.
*
* @remark the write position will NOT be updated.
*/
size_t rd_buf_get_write_iov (const rd_buf_t *rbuf,
struct iovec *iovs, size_t *iovcntp,
size_t iov_max, size_t size_max) {
const rd_segment_t *seg;
size_t iovcnt = 0;
size_t sum = 0;
for (seg = rbuf->rbuf_wpos ;
seg && iovcnt < iov_max && sum < size_max ;
seg = TAILQ_NEXT(seg, seg_link)) {
size_t len;
void *p;
len = rd_segment_write_remains(seg, &p);
if (unlikely(len == 0))
continue;
iovs[iovcnt].iov_base = p;
iovs[iovcnt++].iov_len = len;
sum += len;
}
*iovcntp = iovcnt;
return sum;
}
/**
* @name Slice reader interface
*
* @{
*/
/**
* @brief Initialize a new slice of \p size bytes starting at \p seg with
* relative offset \p rof.
*
* @returns 0 on success or -1 if there is not at least \p size bytes available
* in the buffer.
*/
int rd_slice_init_seg (rd_slice_t *slice, const rd_buf_t *rbuf,
const rd_segment_t *seg, size_t rof, size_t size) {
/* Verify that \p size bytes are indeed available in the buffer. */
if (unlikely(rbuf->rbuf_len < (seg->seg_absof + rof + size)))
return -1;
slice->buf = rbuf;
slice->seg = seg;
slice->rof = rof;
slice->start = seg->seg_absof + rof;
slice->end = slice->start + size;
rd_assert(seg->seg_absof+rof >= slice->start &&
seg->seg_absof+rof <= slice->end);
rd_assert(slice->end <= rd_buf_len(rbuf));
return 0;
}
/**
* @brief Initialize new slice of \p size bytes starting at offset \p absof
*
* @returns 0 on success or -1 if there is not at least \p size bytes available
* in the buffer.
*/
int rd_slice_init (rd_slice_t *slice, const rd_buf_t *rbuf,
size_t absof, size_t size) {
const rd_segment_t *seg = rd_buf_get_segment_at_offset(rbuf, NULL,
absof);
if (unlikely(!seg))
return -1;
return rd_slice_init_seg(slice, rbuf, seg,
absof - seg->seg_absof, size);
}
/**
* @brief Initialize new slice covering the full buffer \p rbuf
*/
void rd_slice_init_full (rd_slice_t *slice, const rd_buf_t *rbuf) {
int r = rd_slice_init(slice, rbuf, 0, rd_buf_len(rbuf));
rd_assert(r == 0);
}
/**
* @sa rd_slice_reader() rd_slice_peeker()
*/
size_t rd_slice_reader0 (rd_slice_t *slice, const void **p, int update_pos) {
size_t rof = slice->rof;
size_t rlen;
const rd_segment_t *seg;
/* Find segment with non-zero payload */
for (seg = slice->seg ;
seg && seg->seg_absof+rof < slice->end && seg->seg_of == rof ;
seg = TAILQ_NEXT(seg, seg_link))
rof = 0;
if (unlikely(!seg || seg->seg_absof+rof >= slice->end))
return 0;
rd_assert(seg->seg_absof+rof <= slice->end);
*p = (const void *)(seg->seg_p + rof);
rlen = RD_MIN(seg->seg_of - rof, rd_slice_remains(slice));
if (update_pos) {
if (slice->seg != seg) {
rd_assert(seg->seg_absof + rof >= slice->start &&
seg->seg_absof + rof+rlen <= slice->end);
slice->seg = seg;
slice->rof = rlen;
} else {
slice->rof += rlen;
}
}
return rlen;
}
/**
* @brief Convenience reader iterator interface.
*
* Call repeatedly from while loop until it returns 0.
*
* @param slice slice to read from, position will be updated.
* @param p will be set to the start of \p *rlenp contiguous bytes of memory
* @param rlenp will be set to the number of bytes available in \p p
*
* @returns the number of bytes read, or 0 if slice is empty.
*/
size_t rd_slice_reader (rd_slice_t *slice, const void **p) {
return rd_slice_reader0(slice, p, 1/*update_pos*/);
}
/**
* @brief Identical to rd_slice_reader() but does NOT update the read position
*/
size_t rd_slice_peeker (const rd_slice_t *slice, const void **p) {
return rd_slice_reader0((rd_slice_t *)slice, p, 0/*dont update_pos*/);
}
/**
* @brief Read \p size bytes from current read position,
* advancing the read offset by the number of bytes copied to \p dst.
*
* If there are less than \p size remaining in the buffer
* then 0 is returned and no bytes are copied.
*
* @returns \p size, or 0 if \p size bytes are not available in buffer.
*
* @remark This performs a complete read, no partitial reads.
*
* @remark If \p dst is NULL only the read position is updated.
*/
size_t rd_slice_read (rd_slice_t *slice, void *dst, size_t size) {
size_t remains = size;
char *d = (char *)dst; /* Possibly NULL */
size_t rlen;
const void *p;
size_t orig_end = slice->end;
if (unlikely(rd_slice_remains(slice) < size))
return 0;
/* Temporarily shrink slice to offset + \p size */
slice->end = rd_slice_abs_offset(slice) + size;
while ((rlen = rd_slice_reader(slice, &p))) {
rd_dassert(remains >= rlen);
if (dst) {
memcpy(d, p, rlen);
d += rlen;
}
remains -= rlen;
}
rd_dassert(remains == 0);
/* Restore original size */
slice->end = orig_end;
return size;
}
/**
* @brief Read \p size bytes from absolute slice offset \p offset
* and store in \p dst, without updating the slice read position.
*
* @returns \p size if the offset and size was within the slice, else 0.
*/
size_t rd_slice_peek (const rd_slice_t *slice, size_t offset,
void *dst, size_t size) {
rd_slice_t sub = *slice;
if (unlikely(rd_slice_seek(&sub, offset) == -1))
return 0;
return rd_slice_read(&sub, dst, size);
}
/**
* @returns a pointer to \p size contiguous bytes at the current read offset.
* If there isn't \p size contiguous bytes available NULL will
* be returned.
*
* @remark The read position is updated to point past \p size.
*/
const void *rd_slice_ensure_contig (rd_slice_t *slice, size_t size) {
void *p;
if (unlikely(rd_slice_remains(slice) < size ||
slice->rof + size > slice->seg->seg_of))
return NULL;
p = slice->seg->seg_p + slice->rof;
rd_slice_read(slice, NULL, size);
return p;
}
/**
* @brief Sets the slice's read position. The offset is the slice offset,
* not buffer offset.
*
* @returns 0 if offset was within range, else -1 in which case the position
* is not changed.
*/
int rd_slice_seek (rd_slice_t *slice, size_t offset) {
const rd_segment_t *seg;
size_t absof = slice->start + offset;
if (unlikely(absof >= slice->end))
return -1;
seg = rd_buf_get_segment_at_offset(slice->buf, slice->seg, absof);
rd_assert(seg);
slice->seg = seg;
slice->rof = absof - seg->seg_absof;
rd_assert(seg->seg_absof + slice->rof >= slice->start &&
seg->seg_absof + slice->rof <= slice->end);
return 0;
}
/**
* @brief Narrow the current slice to \p size, saving
* the original slice state info \p save_slice.
*
* Use rd_slice_widen() to restore the saved slice
* with the read count updated from the narrowed slice.
*
* This is useful for reading a sub-slice of a larger slice
* without having to pass the lesser length around.
*
* @returns 1 if enough underlying slice buffer memory is available, else 0.
*/
int rd_slice_narrow (rd_slice_t *slice, rd_slice_t *save_slice, size_t size) {
if (unlikely(slice->start + size > slice->end))
return 0;
*save_slice = *slice;
slice->end = slice->start + size;
rd_assert(rd_slice_abs_offset(slice) <= slice->end);
return 1;
}
/**
* @brief Same as rd_slice_narrow() but using a relative size \p relsize
* from the current read position.
*/
int rd_slice_narrow_relative (rd_slice_t *slice, rd_slice_t *save_slice,
size_t relsize) {
return rd_slice_narrow(slice, save_slice,
rd_slice_offset(slice) + relsize);
}
/**
* @brief Restore the original \p save_slice size from a previous call to
* rd_slice_narrow(), while keeping the updated read pointer from
* \p slice.
*/
void rd_slice_widen (rd_slice_t *slice, const rd_slice_t *save_slice) {
slice->end = save_slice->end;
}
/**
* @brief Copy the original slice \p orig to \p new_slice and adjust
* the new slice length to \p size.
*
* This is a side-effect free form of rd_slice_narrow() which is not to
* be used with rd_slice_widen().
*
* @returns 1 if enough underlying slice buffer memory is available, else 0.
*/
int rd_slice_narrow_copy (const rd_slice_t *orig, rd_slice_t *new_slice,
size_t size) {
if (unlikely(orig->start + size > orig->end))
return 0;
*new_slice = *orig;
new_slice->end = orig->start + size;
rd_assert(rd_slice_abs_offset(new_slice) <= new_slice->end);
return 1;
}
/**
* @brief Same as rd_slice_narrow_copy() but with a relative size from
* the current read position.
*/
int rd_slice_narrow_copy_relative (const rd_slice_t *orig,
rd_slice_t *new_slice,
size_t relsize) {
return rd_slice_narrow_copy(orig, new_slice,
rd_slice_offset(orig) + relsize);
}
/**
* @brief Set up the iovec \p iovs (of size \p iov_max) with the readable
* segments from the slice's current read position.
*
* @param iovcntp will be set to the number of populated \p iovs[]
* @param size_max limits the total number of bytes made available.
* Note: this value may be overshot with the size of one
* segment.
*
* @returns the total number of bytes in the represented segments.
*
* @remark will NOT update the read position.
*/
size_t rd_slice_get_iov (const rd_slice_t *slice,
struct iovec *iovs, size_t *iovcntp,
size_t iov_max, size_t size_max) {
const void *p;
size_t rlen;
size_t iovcnt = 0;
size_t sum = 0;
rd_slice_t copy = *slice; /* Use a copy of the slice so we dont
* update the position for the caller. */
while (sum < size_max && iovcnt < iov_max &&
(rlen = rd_slice_reader(&copy, &p))) {
iovs[iovcnt].iov_base = (void *)p;
iovs[iovcnt++].iov_len = rlen;
sum += rlen;
}
*iovcntp = iovcnt;
return sum;
}
/**
* @brief CRC32 calculation of slice.
*
* @returns the calculated CRC
*
* @remark the slice's position is updated.
*/
uint32_t rd_slice_crc32 (rd_slice_t *slice) {
rd_crc32_t crc;
const void *p;
size_t rlen;
crc = rd_crc32_init();
while ((rlen = rd_slice_reader(slice, &p)))
crc = rd_crc32_update(crc, p, rlen);
return (uint32_t)rd_crc32_finalize(crc);
}
/**
* @brief Compute CRC-32C of segments starting at at buffer position \p absof,
* also supporting the case where the position/offset is not at the
* start of the first segment.
*
* @remark the slice's position is updated.
*/
uint32_t rd_slice_crc32c (rd_slice_t *slice) {
const void *p;
size_t rlen;
uint32_t crc = 0;
while ((rlen = rd_slice_reader(slice, &p)))
crc = crc32c(crc, (const char *)p, rlen);
return crc;
}
/**
* @name Debugging dumpers
*
*
*/
static void rd_segment_dump (const rd_segment_t *seg, const char *ind,
size_t relof, int do_hexdump) {
fprintf(stderr,
"%s((rd_segment_t *)%p): "
"p %p, of %"PRIusz", "
"absof %"PRIusz", size %"PRIusz", free %p, flags 0x%x\n",
ind, seg, seg->seg_p, seg->seg_of,
seg->seg_absof, seg->seg_size, seg->seg_free, seg->seg_flags);
rd_assert(relof <= seg->seg_of);
if (do_hexdump)
rd_hexdump(stderr, "segment",
seg->seg_p+relof, seg->seg_of-relof);
}
void rd_buf_dump (const rd_buf_t *rbuf, int do_hexdump) {
const rd_segment_t *seg;
fprintf(stderr,
"((rd_buf_t *)%p):\n"
" len %"PRIusz" size %"PRIusz
", %"PRIusz"/%"PRIusz" extra memory used\n",
rbuf, rbuf->rbuf_len, rbuf->rbuf_size,
rbuf->rbuf_extra_len, rbuf->rbuf_extra_size);
if (rbuf->rbuf_wpos) {
fprintf(stderr, " wpos:\n");
rd_segment_dump(rbuf->rbuf_wpos, " ", 0, 0);
}
if (rbuf->rbuf_segment_cnt > 0) {
size_t segcnt = 0;
fprintf(stderr, " %"PRIusz" linked segments:\n",
rbuf->rbuf_segment_cnt);
TAILQ_FOREACH(seg, &rbuf->rbuf_segments, seg_link) {
rd_segment_dump(seg, " ", 0, do_hexdump);
rd_assert(++segcnt <= rbuf->rbuf_segment_cnt);
}
}
}
void rd_slice_dump (const rd_slice_t *slice, int do_hexdump) {
const rd_segment_t *seg;
size_t relof;
fprintf(stderr,
"((rd_slice_t *)%p):\n"
" buf %p (len %"PRIusz"), seg %p (absof %"PRIusz"), "
"rof %"PRIusz", start %"PRIusz", end %"PRIusz", size %"PRIusz
", offset %"PRIusz"\n",
slice, slice->buf, rd_buf_len(slice->buf),
slice->seg, slice->seg ? slice->seg->seg_absof : 0,
slice->rof, slice->start, slice->end,
rd_slice_size(slice), rd_slice_offset(slice));
relof = slice->rof;
for (seg = slice->seg ; seg ; seg = TAILQ_NEXT(seg, seg_link)) {
rd_segment_dump(seg, " ", relof, do_hexdump);
relof = 0;
}
}
/**
* @name Unit-tests
*
*
*
*/
/**
* @brief Basic write+read test
*/
static int do_unittest_write_read (void) {
rd_buf_t b;
char ones[1024];
char twos[1024];
char threes[1024];
char fiftyfives[100]; /* 0x55 indicates "untouched" memory */
char buf[1024*3];
rd_slice_t slice;
size_t r, pos;
memset(ones, 0x1, sizeof(ones));
memset(twos, 0x2, sizeof(twos));
memset(threes, 0x3, sizeof(threes));
memset(fiftyfives, 0x55, sizeof(fiftyfives));
memset(buf, 0x55, sizeof(buf));
rd_buf_init(&b, 2, 1000);
/*
* Verify write
*/
r = rd_buf_write(&b, ones, 200);
RD_UT_ASSERT(r == 0, "write() returned position %"PRIusz, r);
pos = rd_buf_write_pos(&b);
RD_UT_ASSERT(pos == 200, "pos() returned position %"PRIusz, pos);
r = rd_buf_write(&b, twos, 800);
RD_UT_ASSERT(pos == 200, "write() returned position %"PRIusz, r);
pos = rd_buf_write_pos(&b);
RD_UT_ASSERT(pos == 200+800, "pos() returned position %"PRIusz, pos);
/* Buffer grows here */
r = rd_buf_write(&b, threes, 1);
RD_UT_ASSERT(pos == 200+800,
"write() returned position %"PRIusz, r);
pos = rd_buf_write_pos(&b);
RD_UT_ASSERT(pos == 200+800+1, "pos() returned position %"PRIusz, pos);
/*
* Verify read
*/
/* Get full slice. */
rd_slice_init_full(&slice, &b);
r = rd_slice_read(&slice, buf, 200+800+2);
RD_UT_ASSERT(r == 0,
"read() > remaining should have failed, gave %"PRIusz, r);
r = rd_slice_read(&slice, buf, 200+800+1);
RD_UT_ASSERT(r == 200+800+1,
"read() returned %"PRIusz" (%"PRIusz" remains)",
r, rd_slice_remains(&slice));
RD_UT_ASSERT(!memcmp(buf, ones, 200), "verify ones");
RD_UT_ASSERT(!memcmp(buf+200, twos, 800), "verify twos");
RD_UT_ASSERT(!memcmp(buf+200+800, threes, 1), "verify threes");
RD_UT_ASSERT(!memcmp(buf+200+800+1, fiftyfives, 100), "verify 55s");
rd_buf_destroy(&b);
RD_UT_PASS();
}
/**
* @brief Helper read verifier, not a unit-test itself.
*/
#define do_unittest_read_verify(b,absof,len,verify) do { \
int __fail = do_unittest_read_verify0(b,absof,len,verify); \
RD_UT_ASSERT(!__fail, \
"read_verify(absof=%"PRIusz",len=%"PRIusz") " \
"failed", (size_t)absof, (size_t)len); \
} while (0)
static int
do_unittest_read_verify0 (const rd_buf_t *b, size_t absof, size_t len,
const char *verify) {
rd_slice_t slice, sub;
char buf[1024];
size_t half;
size_t r;
int i;
rd_assert(sizeof(buf) >= len);
/* Get reader slice */
i = rd_slice_init(&slice, b, absof, len);
RD_UT_ASSERT(i == 0, "slice_init() failed: %d", i);
r = rd_slice_read(&slice, buf, len);
RD_UT_ASSERT(r == len,
"read() returned %"PRIusz" expected %"PRIusz
" (%"PRIusz" remains)",
r, len, rd_slice_remains(&slice));
RD_UT_ASSERT(!memcmp(buf, verify, len), "verify");
r = rd_slice_offset(&slice);
RD_UT_ASSERT(r == len, "offset() returned %"PRIusz", not %"PRIusz,
r, len);
half = len / 2;
i = rd_slice_seek(&slice, half);
RD_UT_ASSERT(i == 0, "seek(%"PRIusz") returned %d", half, i);
r = rd_slice_offset(&slice);
RD_UT_ASSERT(r == half, "offset() returned %"PRIusz", not %"PRIusz,
r, half);
/* Get a sub-slice covering the later half. */
sub = rd_slice_pos(&slice);
r = rd_slice_offset(&sub);
RD_UT_ASSERT(r == 0, "sub: offset() returned %"PRIusz", not %"PRIusz,
r, (size_t)0);
r = rd_slice_size(&sub);
RD_UT_ASSERT(r == half, "sub: size() returned %"PRIusz", not %"PRIusz,
r, half);
r = rd_slice_remains(&sub);
RD_UT_ASSERT(r == half,
"sub: remains() returned %"PRIusz", not %"PRIusz,
r, half);
/* Read half */
r = rd_slice_read(&sub, buf, half);
RD_UT_ASSERT(r == half,
"sub read() returned %"PRIusz" expected %"PRIusz
" (%"PRIusz" remains)",
r, len, rd_slice_remains(&sub));
RD_UT_ASSERT(!memcmp(buf, verify, len), "verify");
r = rd_slice_offset(&sub);
RD_UT_ASSERT(r == rd_slice_size(&sub),
"sub offset() returned %"PRIusz", not %"PRIusz,
r, rd_slice_size(&sub));
r = rd_slice_remains(&sub);
RD_UT_ASSERT(r == 0,
"sub: remains() returned %"PRIusz", not %"PRIusz,
r, (size_t)0);
return 0;
}
/**
* @brief write_seek() and split() test
*/
static int do_unittest_write_split_seek (void) {
rd_buf_t b;
char ones[1024];
char twos[1024];
char threes[1024];
char fiftyfives[100]; /* 0x55 indicates "untouched" memory */
char buf[1024*3];
size_t r, pos;
rd_segment_t *seg, *newseg;
memset(ones, 0x1, sizeof(ones));
memset(twos, 0x2, sizeof(twos));
memset(threes, 0x3, sizeof(threes));
memset(fiftyfives, 0x55, sizeof(fiftyfives));
memset(buf, 0x55, sizeof(buf));
rd_buf_init(&b, 0, 0);
/*
* Verify write
*/
r = rd_buf_write(&b, ones, 400);
RD_UT_ASSERT(r == 0, "write() returned position %"PRIusz, r);
pos = rd_buf_write_pos(&b);
RD_UT_ASSERT(pos == 400, "pos() returned position %"PRIusz, pos);
do_unittest_read_verify(&b, 0, 400, ones);
/*
* Seek and re-write
*/
r = rd_buf_write_seek(&b, 200);
RD_UT_ASSERT(r == 0, "seek() failed");
pos = rd_buf_write_pos(&b);
RD_UT_ASSERT(pos == 200, "pos() returned position %"PRIusz, pos);
r = rd_buf_write(&b, twos, 100);
RD_UT_ASSERT(pos == 200, "write() returned position %"PRIusz, r);
pos = rd_buf_write_pos(&b);
RD_UT_ASSERT(pos == 200+100, "pos() returned position %"PRIusz, pos);
do_unittest_read_verify(&b, 0, 200, ones);
do_unittest_read_verify(&b, 200, 100, twos);
/* Make sure read() did not modify the write position. */
pos = rd_buf_write_pos(&b);
RD_UT_ASSERT(pos == 200+100, "pos() returned position %"PRIusz, pos);
/* Split buffer, write position is now at split where writes
* are not allowed (mid buffer). */
seg = rd_buf_get_segment_at_offset(&b, NULL, 50);
RD_UT_ASSERT(seg->seg_of != 0, "assumed mid-segment");
newseg = rd_segment_split(&b, seg, 50);
rd_buf_append_segment(&b, newseg);
seg = rd_buf_get_segment_at_offset(&b, NULL, 50);
RD_UT_ASSERT(seg != NULL, "seg");
RD_UT_ASSERT(seg == newseg, "newseg %p, seg %p", newseg, seg);
RD_UT_ASSERT(seg->seg_of > 0,
"assumed beginning of segment, got %"PRIusz, seg->seg_of);
pos = rd_buf_write_pos(&b);
RD_UT_ASSERT(pos == 200+100, "pos() returned position %"PRIusz, pos);
/* Re-verify that nothing changed */
do_unittest_read_verify(&b, 0, 200, ones);
do_unittest_read_verify(&b, 200, 100, twos);
/* Do a write seek at buffer boundary, sub-sequent buffers should
* be destroyed. */
r = rd_buf_write_seek(&b, 50);
RD_UT_ASSERT(r == 0, "seek() failed");
do_unittest_read_verify(&b, 0, 50, ones);
rd_buf_destroy(&b);
RD_UT_PASS();
}
/**
* @brief Unittest to verify payload is correctly written and read.
* Each written u32 word is the running CRC of the word count.
*/
static int do_unittest_write_read_payload_correctness (void) {
uint32_t crc;
uint32_t write_crc, read_crc;
const int seed = 12345;
rd_buf_t b;
const size_t max_cnt = 20000;
rd_slice_t slice;
size_t r;
size_t i;
int pass;
crc = rd_crc32_init();
crc = rd_crc32_update(crc, (void *)&seed, sizeof(seed));
rd_buf_init(&b, 0, 0);
for (i = 0 ; i < max_cnt ; i++) {
crc = rd_crc32_update(crc, (void *)&i, sizeof(i));
rd_buf_write(&b, &crc, sizeof(crc));
}
write_crc = rd_crc32_finalize(crc);
r = rd_buf_len(&b);
RD_UT_ASSERT(r == max_cnt * sizeof(crc),
"expected length %"PRIusz", not %"PRIusz,
r, max_cnt * sizeof(crc));
/*
* Now verify the contents with a reader.
*/
rd_slice_init_full(&slice, &b);
r = rd_slice_remains(&slice);
RD_UT_ASSERT(r == rd_buf_len(&b),
"slice remains %"PRIusz", should be %"PRIusz,
r, rd_buf_len(&b));
for (pass = 0 ; pass < 2 ; pass++) {
/* Two passes:
* - pass 1: using peek()
* - pass 2: using read()
*/
const char *pass_str = pass == 0 ? "peek":"read";
crc = rd_crc32_init();
crc = rd_crc32_update(crc, (void *)&seed, sizeof(seed));
for (i = 0 ; i < max_cnt ; i++) {
uint32_t buf_crc;
crc = rd_crc32_update(crc, (void *)&i, sizeof(&i));
if (pass == 0)
r = rd_slice_peek(&slice, i * sizeof(buf_crc),
&buf_crc, sizeof(buf_crc));
else
r = rd_slice_read(&slice, &buf_crc,
sizeof(buf_crc));
RD_UT_ASSERT(r == sizeof(buf_crc),
"%s() at #%"PRIusz" failed: "
"r is %"PRIusz" not %"PRIusz,
pass_str, i, r, sizeof(buf_crc));
RD_UT_ASSERT(buf_crc == crc,
"%s: invalid crc at #%"PRIusz
": expected %"PRIu32", read %"PRIu32,
pass_str, i, crc, buf_crc);
}
read_crc = rd_crc32_finalize(crc);
RD_UT_ASSERT(read_crc == write_crc,
"%s: finalized read crc %"PRIu32
" != write crc %"PRIu32,
pass_str, read_crc, write_crc);
}
r = rd_slice_remains(&slice);
RD_UT_ASSERT(r == 0,
"slice remains %"PRIusz", should be %"PRIusz,
r, (size_t)0);
rd_buf_destroy(&b);
RD_UT_PASS();
}
#define do_unittest_iov_verify(...) do { \
int __fail = do_unittest_iov_verify0(__VA_ARGS__); \
RD_UT_ASSERT(!__fail, "iov_verify() failed"); \
} while (0)
static int do_unittest_iov_verify0 (rd_buf_t *b,
size_t exp_iovcnt, size_t exp_totsize) {
#define MY_IOV_MAX 16
struct iovec iov[MY_IOV_MAX];
size_t iovcnt;
size_t i;
size_t totsize, sum;
rd_assert(exp_iovcnt <= MY_IOV_MAX);
totsize = rd_buf_get_write_iov(b, iov, &iovcnt, MY_IOV_MAX, exp_totsize);
RD_UT_ASSERT(totsize >= exp_totsize,
"iov total size %"PRIusz" expected >= %"PRIusz,
totsize, exp_totsize);
RD_UT_ASSERT(iovcnt >= exp_iovcnt && iovcnt <= MY_IOV_MAX,
"iovcnt %"PRIusz
", expected %"PRIusz" < x <= MY_IOV_MAX",
iovcnt, exp_iovcnt);
sum = 0;
for (i = 0 ; i < iovcnt ; i++) {
RD_UT_ASSERT(iov[i].iov_base,
"iov #%"PRIusz" iov_base not set", i);
RD_UT_ASSERT(iov[i].iov_len,
"iov #%"PRIusz" iov_len %"PRIusz" out of range",
i, iov[i].iov_len);
sum += iov[i].iov_len;
RD_UT_ASSERT(sum <= totsize, "sum %"PRIusz" > totsize %"PRIusz,
sum, totsize);
}
RD_UT_ASSERT(sum == totsize,
"sum %"PRIusz" != totsize %"PRIusz,
sum, totsize);
return 0;
}
/**
* @brief Verify that buffer to iovec conversion works.
*/
static int do_unittest_write_iov (void) {
rd_buf_t b;
rd_buf_init(&b, 0, 0);
rd_buf_write_ensure(&b, 100, 100);
do_unittest_iov_verify(&b, 1, 100);
/* Add a secondary buffer */
rd_buf_write_ensure(&b, 30000, 0);
do_unittest_iov_verify(&b, 2, 100+30000);
rd_buf_destroy(&b);
RD_UT_PASS();
}
int unittest_rdbuf (void) {
int fails = 0;
fails += do_unittest_write_read();
fails += do_unittest_write_split_seek();
fails += do_unittest_write_read_payload_correctness();
fails += do_unittest_write_iov();
return fails;
}