blob: 5faad847910ad7cb8fcf5a8785ffcc97e5a15c92 [file] [log] [blame]
/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2017 Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "rd.h"
#include "rdkafka_int.h"
#include "rdkafka_msg.h"
#include "rdkafka_msgset.h"
#include "rdkafka_topic.h"
#include "rdkafka_partition.h"
#include "rdkafka_lz4.h"
#include "snappy.h"
#include "rdvarint.h"
#include "crc32c.h"
typedef struct rd_kafka_msgset_writer_s {
rd_kafka_buf_t *msetw_rkbuf; /* Backing store buffer (refcounted)*/
int16_t msetw_ApiVersion; /* ProduceRequest ApiVersion */
int msetw_MsgVersion; /* MsgVersion to construct */
int msetw_features; /* Protocol features to use */
int msetw_msgcntmax; /* Max number of messages to send
* in a batch. */
size_t msetw_messages_len; /* Total size of Messages, without
* MessageSet header */
size_t msetw_MessageSetSize; /* Current MessageSetSize value */
size_t msetw_of_MessageSetSize; /* offset of MessageSetSize */
size_t msetw_of_start; /* offset of MessageSet */
int msetw_relative_offsets; /* Bool: use relative offsets */
/* For MessageSet v2 */
int msetw_Attributes; /* MessageSet Attributes */
int64_t msetw_MaxTimestamp; /* Maximum timestamp in batch */
size_t msetw_of_CRC; /* offset of MessageSet.CRC */
/* First message information */
struct {
size_t of; /* rkbuf's first message position */
int64_t timestamp;
} msetw_firstmsg;
rd_kafka_broker_t *msetw_rkb; /* @warning Not a refcounted
* reference! */
rd_kafka_toppar_t *msetw_rktp; /* @warning Not a refcounted
* reference! */
} rd_kafka_msgset_writer_t;
/**
* @brief Select ApiVersion and MsgVersion to use based on broker's
* feature compatibility.
*
* @locality broker thread
*/
static RD_INLINE void
rd_kafka_msgset_writer_select_MsgVersion (rd_kafka_msgset_writer_t *msetw) {
rd_kafka_broker_t *rkb = msetw->msetw_rkb;
int feature;
if ((feature = rkb->rkb_features & RD_KAFKA_FEATURE_MSGVER2)) {
msetw->msetw_ApiVersion = 3;
msetw->msetw_MsgVersion = 2;
msetw->msetw_features |= feature;
} else if ((feature = rkb->rkb_features & RD_KAFKA_FEATURE_MSGVER1)) {
msetw->msetw_ApiVersion = 2;
msetw->msetw_MsgVersion = 1;
msetw->msetw_features |= feature;
} else {
if ((feature =
rkb->rkb_features & RD_KAFKA_FEATURE_THROTTLETIME)) {
msetw->msetw_ApiVersion = 1;
msetw->msetw_features |= feature;
} else
msetw->msetw_ApiVersion = 0;
msetw->msetw_MsgVersion = 0;
}
}
/**
* @brief Allocate buffer for messageset writer based on a previously set
* up \p msetw.
*
* Allocate iovecs to hold all headers and messages,
* and allocate enough space to allow copies of small messages.
* The allocated size is the minimum of message.max.bytes
* or queued_bytes + msgcntmax * msg_overhead
*/
static void
rd_kafka_msgset_writer_alloc_buf (rd_kafka_msgset_writer_t *msetw) {
rd_kafka_t *rk = msetw->msetw_rkb->rkb_rk;
size_t msg_overhead = 0;
size_t hdrsize = 0;
size_t msgsetsize = 0;
size_t bufsize;
rd_kafka_assert(NULL, !msetw->msetw_rkbuf);
/* Calculate worst-case buffer size, produce header size,
* message size, etc, this isn't critical but avoids unnecesary
* extra allocations. The buffer will grow as needed if we get
* this wrong.
*
* ProduceRequest headers go in one iovec:
* ProduceRequest v0..2:
* RequiredAcks + Timeout +
* [Topic + [Partition + MessageSetSize]]
*
* ProduceRequest v3:
* TransactionalId + RequiredAcks + Timeout +
* [Topic + [Partition + MessageSetSize + MessageSet]]
*/
/*
* ProduceRequest header sizes
*/
switch (msetw->msetw_ApiVersion)
{
case 3:
/* Add TransactionalId */
hdrsize += RD_KAFKAP_STR_SIZE(rk->rk_eos.TransactionalId);
/* FALLTHRU */
case 0:
case 1:
case 2:
hdrsize +=
/* RequiredAcks + Timeout + TopicCnt */
2 + 4 + 4 +
/* Topic */
RD_KAFKAP_STR_SIZE(msetw->msetw_rktp->
rktp_rkt->rkt_topic) +
/* PartitionCnt + Partition + MessageSetSize */
4 + 4 + 4;
msgsetsize += 4; /* MessageSetSize */
break;
default:
RD_NOTREACHED();
}
/*
* MsgVersion specific sizes:
* - (Worst-case) Message overhead: message fields
* - MessageSet header size
*/
switch (msetw->msetw_MsgVersion)
{
case 0:
/* MsgVer0 */
msg_overhead = RD_KAFKAP_MESSAGE_V0_OVERHEAD;
break;
case 1:
/* MsgVer1 */
msg_overhead = RD_KAFKAP_MESSAGE_V1_OVERHEAD;
break;
case 2:
/* MsgVer2 uses varints, we calculate for the worst-case. */
msg_overhead += RD_KAFKAP_MESSAGE_V2_OVERHEAD;
/* MessageSet header fields */
msgsetsize +=
8 /* BaseOffset */ +
4 /* Length */ +
4 /* PartitionLeaderEpoch */ +
1 /* Magic (MsgVersion) */ +
4 /* CRC (CRC32C) */ +
2 /* Attributes */ +
4 /* LastOffsetDelta */ +
8 /* BaseTimestamp */ +
8 /* MaxTimestamp */ +
8 /* ProducerId */ +
2 /* ProducerEpoch */ +
4 /* BaseSequence */ +
4 /* RecordCount */;
break;
default:
RD_NOTREACHED();
}
/*
* Calculate total buffer size to allocate
*/
bufsize = hdrsize + msgsetsize;
/* If copying for small payloads is enabled, allocate enough
* space for each message to be copied based on this limit.
*/
if (rk->rk_conf.msg_copy_max_size > 0) {
size_t queued_bytes = rd_kafka_msgq_size(&msetw->msetw_rktp->
rktp_xmit_msgq);
bufsize += RD_MIN(queued_bytes,
(size_t)rk->rk_conf.msg_copy_max_size *
msetw->msetw_msgcntmax);
}
/* Add estimed per-message overhead */
bufsize += msg_overhead * msetw->msetw_msgcntmax;
/* Cap allocation at message.max.bytes */
if (bufsize > (size_t)rk->rk_conf.max_msg_size)
bufsize = (size_t)rk->rk_conf.max_msg_size;
/*
* Allocate iovecs to hold all headers and messages,
* and allocate auxilliery space for message headers, etc.
*/
msetw->msetw_rkbuf =
rd_kafka_buf_new_request(msetw->msetw_rkb, RD_KAFKAP_Produce,
msetw->msetw_msgcntmax/2 + 10,
bufsize);
rd_kafka_buf_ApiVersion_set(msetw->msetw_rkbuf,
msetw->msetw_ApiVersion,
msetw->msetw_features);
}
/**
* @brief Write the MessageSet header.
* @remark Must only be called for MsgVersion 2
*/
static void
rd_kafka_msgset_writer_write_MessageSet_v2_header (
rd_kafka_msgset_writer_t *msetw) {
rd_kafka_buf_t *rkbuf = msetw->msetw_rkbuf;
rd_kafka_t *rk = msetw->msetw_rkb->rkb_rk;
rd_kafka_assert(NULL, msetw->msetw_ApiVersion >= 3);
rd_kafka_assert(NULL, msetw->msetw_MsgVersion == 2);
/* BaseOffset (also store the offset to the start of
* the messageset header fields) */
msetw->msetw_of_start = rd_kafka_buf_write_i64(rkbuf, 0);
/* Length: updated later */
rd_kafka_buf_write_i32(rkbuf, 0);
/* PartitionLeaderEpoch (KIP-101) */
rd_kafka_buf_write_i32(rkbuf, 0);
/* Magic (MsgVersion) */
rd_kafka_buf_write_i8(rkbuf, msetw->msetw_MsgVersion);
/* CRC (CRC32C): updated later.
* CRC needs to be done after the entire messageset+messages has
* been constructed and the following header fields updated. :(
* Save the offset for this position. so it can be udpated later. */
msetw->msetw_of_CRC = rd_kafka_buf_write_i32(rkbuf, 0);
/* Attributes: updated later */
rd_kafka_buf_write_i16(rkbuf, 0);
/* LastOffsetDelta: updated later */
rd_kafka_buf_write_i32(rkbuf, 0);
/* BaseTimestamp: updated later */
rd_kafka_buf_write_i64(rkbuf, 0);
/* MaxTimestamp: updated later */
rd_kafka_buf_write_i64(rkbuf, 0);
/* ProducerId */
rd_kafka_buf_write_i64(rkbuf, rk->rk_eos.PID);
/* ProducerEpoch */
rd_kafka_buf_write_i16(rkbuf, rk->rk_eos.ProducerEpoch);
/* BaseSequence */
rd_kafka_buf_write_i32(rkbuf, -1);
/* RecordCount: udpated later */
rd_kafka_buf_write_i32(rkbuf, 0);
}
/**
* @brief Write ProduceRequest headers.
* When this function returns the msgset is ready for
* writing individual messages.
* msetw_MessageSetSize will have been set to the messageset header.
*/
static void
rd_kafka_msgset_writer_write_Produce_header (rd_kafka_msgset_writer_t *msetw) {
rd_kafka_buf_t *rkbuf = msetw->msetw_rkbuf;
rd_kafka_t *rk = msetw->msetw_rkb->rkb_rk;
rd_kafka_itopic_t *rkt = msetw->msetw_rktp->rktp_rkt;
/* V3: TransactionalId */
if (msetw->msetw_ApiVersion == 3)
rd_kafka_buf_write_kstr(rkbuf, rk->rk_eos.TransactionalId);
/* RequiredAcks */
rd_kafka_buf_write_i16(rkbuf, rkt->rkt_conf.required_acks);
/* Timeout */
rd_kafka_buf_write_i32(rkbuf, rkt->rkt_conf.request_timeout_ms);
/* TopicArrayCnt */
rd_kafka_buf_write_i32(rkbuf, 1);
/* Insert topic */
rd_kafka_buf_write_kstr(rkbuf, rkt->rkt_topic);
/* PartitionArrayCnt */
rd_kafka_buf_write_i32(rkbuf, 1);
/* Partition */
rd_kafka_buf_write_i32(rkbuf, msetw->msetw_rktp->rktp_partition);
/* MessageSetSize: Will be finalized later*/
msetw->msetw_of_MessageSetSize = rd_kafka_buf_write_i32(rkbuf, 0);
if (msetw->msetw_MsgVersion == 2) {
/* MessageSet v2 header */
rd_kafka_msgset_writer_write_MessageSet_v2_header(msetw);
msetw->msetw_MessageSetSize = RD_KAFKAP_MSGSET_V2_SIZE;
} else {
/* Older MessageSet */
msetw->msetw_MessageSetSize = RD_KAFKAP_MSGSET_V0_SIZE;
}
}
/**
* @brief Initialize a ProduceRequest MessageSet writer for
* the given broker and partition.
*
* A new buffer will be allocated to fit the pending messages in queue.
*
* @returns the number of messages to enqueue
*
* @remark This currently constructs the entire ProduceRequest, containing
* a single outer MessageSet for a single partition.
*/
static int rd_kafka_msgset_writer_init (rd_kafka_msgset_writer_t *msetw,
rd_kafka_broker_t *rkb,
rd_kafka_toppar_t *rktp) {
int msgcnt = rd_atomic32_get(&rktp->rktp_xmit_msgq.rkmq_msg_cnt);
if (msgcnt == 0)
return 0;
memset(msetw, 0, sizeof(*msetw));
msetw->msetw_rktp = rktp;
msetw->msetw_rkb = rkb;
/* Max number of messages to send in a batch,
* limited by current queue size or configured batch size,
* whichever is lower. */
msetw->msetw_msgcntmax = RD_MIN(msgcnt,
rkb->rkb_rk->rk_conf.
batch_num_messages);
rd_dassert(msetw->msetw_msgcntmax > 0);
/* Select MsgVersion to use */
rd_kafka_msgset_writer_select_MsgVersion(msetw);
/* MsgVersion specific setup. */
switch (msetw->msetw_MsgVersion)
{
case 2:
msetw->msetw_relative_offsets = 1; /* OffsetDelta */
break;
case 1:
if (rktp->rktp_rkt->rkt_conf.compression_codec)
msetw->msetw_relative_offsets = 1;
break;
}
/* Allocate backing buffer */
rd_kafka_msgset_writer_alloc_buf(msetw);
/* Construct first part of Produce header + MessageSet header */
rd_kafka_msgset_writer_write_Produce_header(msetw);
/* The current buffer position is now where the first message
* is located.
* Record the current buffer position so it can be rewound later
* in case of compression. */
msetw->msetw_firstmsg.of = rd_buf_write_pos(&msetw->msetw_rkbuf->
rkbuf_buf);
return msetw->msetw_msgcntmax;
}
/**
* @brief Copy or link message payload to buffer.
*/
static RD_INLINE void
rd_kafka_msgset_writer_write_msg_payload (rd_kafka_msgset_writer_t *msetw,
const rd_kafka_msg_t *rkm,
void (*free_cb)(void *)) {
const rd_kafka_t *rk = msetw->msetw_rkb->rkb_rk;
rd_kafka_buf_t *rkbuf = msetw->msetw_rkbuf;
/* If payload is below the copy limit and there is still
* room in the buffer we'll copy the payload to the buffer,
* otherwise we push a reference to the memory. */
if (rkm->rkm_len <= (size_t)rk->rk_conf.msg_copy_max_size &&
rd_buf_write_remains(&rkbuf->rkbuf_buf) > rkm->rkm_len)
rd_kafka_buf_write(rkbuf,
rkm->rkm_payload, rkm->rkm_len);
else
rd_kafka_buf_push(rkbuf, rkm->rkm_payload, rkm->rkm_len,
free_cb);
}
/**
* @brief Write message to messageset buffer with MsgVersion 0 or 1.
* @returns the number of bytes written.
*/
static size_t
rd_kafka_msgset_writer_write_msg_v0_1 (rd_kafka_msgset_writer_t *msetw,
rd_kafka_msg_t *rkm,
int64_t Offset,
int8_t MsgAttributes,
void (*free_cb)(void *)) {
rd_kafka_buf_t *rkbuf = msetw->msetw_rkbuf;
size_t MessageSize;
size_t of_Crc;
/*
* MessageSet's (v0 and v1) per-Message header.
*/
/* Offset (only relevant for compressed messages on MsgVersion v1) */
rd_kafka_buf_write_i64(rkbuf, Offset);
/* MessageSize */
MessageSize =
4 + 1 + 1 + /* Crc+MagicByte+Attributes */
4 /* KeyLength */ + rkm->rkm_key_len +
4 /* ValueLength */ + rkm->rkm_len;
if (msetw->msetw_MsgVersion == 1)
MessageSize += 8; /* Timestamp i64 */
rd_kafka_buf_write_i32(rkbuf, (int32_t)MessageSize);
/*
* Message
*/
/* Crc: will be updated later */
of_Crc = rd_kafka_buf_write_i32(rkbuf, 0);
/* Start Crc calculation of all buf writes. */
rd_kafka_buf_crc_init(rkbuf);
/* MagicByte */
rd_kafka_buf_write_i8(rkbuf, msetw->msetw_MsgVersion);
/* Attributes */
rd_kafka_buf_write_i8(rkbuf, MsgAttributes);
/* V1: Timestamp */
if (msetw->msetw_MsgVersion == 1)
rd_kafka_buf_write_i64(rkbuf, rkm->rkm_timestamp);
/* Message Key */
rd_kafka_buf_write_bytes(rkbuf, rkm->rkm_key, rkm->rkm_key_len);
/* Write or copy Value/payload */
if (rkm->rkm_payload) {
rd_kafka_buf_write_i32(rkbuf, (int32_t)rkm->rkm_len);
rd_kafka_msgset_writer_write_msg_payload(msetw, rkm, free_cb);
} else
rd_kafka_buf_write_i32(rkbuf, RD_KAFKAP_BYTES_LEN_NULL);
/* Finalize Crc */
rd_kafka_buf_update_u32(rkbuf, of_Crc,
rd_kafka_buf_crc_finalize(rkbuf));
/* Return written message size */
return 8/*Offset*/ + 4/*MessageSize*/ + MessageSize;
}
/**
* @brief Write message to messageset buffer with MsgVersion 2.
* @returns the number of bytes written.
*/
static size_t
rd_kafka_msgset_writer_write_msg_v2 (rd_kafka_msgset_writer_t *msetw,
rd_kafka_msg_t *rkm,
int64_t Offset,
int8_t MsgAttributes,
void (*free_cb)(void *)) {
rd_kafka_buf_t *rkbuf = msetw->msetw_rkbuf;
size_t MessageSize = 0;
char varint_Length[RD_UVARINT_ENC_SIZEOF(int32_t)];
char varint_TimestampDelta[RD_UVARINT_ENC_SIZEOF(int64_t)];
char varint_OffsetDelta[RD_UVARINT_ENC_SIZEOF(int64_t)];
char varint_KeyLen[RD_UVARINT_ENC_SIZEOF(int32_t)];
char varint_ValueLen[RD_UVARINT_ENC_SIZEOF(int32_t)];
char varint_HeaderCount[RD_UVARINT_ENC_SIZEOF(int32_t)];
size_t sz_Length;
size_t sz_TimestampDelta;
size_t sz_OffsetDelta;
size_t sz_KeyLen;
size_t sz_ValueLen;
size_t sz_HeaderCount;
/* All varints, except for Length, needs to be pre-built
* so that the Length field can be set correctly and thus have
* correct varint encoded width. */
sz_TimestampDelta = rd_uvarint_enc_i64(
varint_TimestampDelta, sizeof(varint_TimestampDelta),
rkm->rkm_timestamp - msetw->msetw_firstmsg.timestamp);
sz_OffsetDelta = rd_uvarint_enc_i64(
varint_OffsetDelta, sizeof(varint_OffsetDelta), Offset);
sz_KeyLen = rd_uvarint_enc_i32(
varint_KeyLen, sizeof(varint_KeyLen),
rkm->rkm_key ? (int32_t)rkm->rkm_key_len :
(int32_t)RD_KAFKAP_BYTES_LEN_NULL);
sz_ValueLen = rd_uvarint_enc_i32(
varint_ValueLen, sizeof(varint_ValueLen),
rkm->rkm_payload ? (int32_t)rkm->rkm_len :
(int32_t)RD_KAFKAP_BYTES_LEN_NULL);
sz_HeaderCount = rd_uvarint_enc_i32(
varint_HeaderCount, sizeof(varint_HeaderCount), 0);
/* Calculate MessageSize without length of Length (added later)
* to store it in Length. */
MessageSize =
1 /* MsgAttributes */ +
sz_TimestampDelta +
sz_OffsetDelta +
sz_KeyLen +
rkm->rkm_key_len +
sz_ValueLen +
rkm->rkm_len +
sz_HeaderCount;
/* Length */
sz_Length = rd_uvarint_enc_i64(varint_Length, sizeof(varint_Length),
MessageSize);
rd_kafka_buf_write(rkbuf, varint_Length, sz_Length);
MessageSize += sz_Length;
/* Attributes: The MsgAttributes argument is losely based on MsgVer0
* which don't apply for MsgVer2 */
rd_kafka_buf_write_i8(rkbuf, 0);
/* TimestampDelta */
rd_kafka_buf_write(rkbuf, varint_TimestampDelta, sz_TimestampDelta);
/* OffsetDelta */
rd_kafka_buf_write(rkbuf, varint_OffsetDelta, sz_OffsetDelta);
/* KeyLen */
rd_kafka_buf_write(rkbuf, varint_KeyLen, sz_KeyLen);
/* Key (if any) */
if (rkm->rkm_key)
rd_kafka_buf_write(rkbuf, rkm->rkm_key, rkm->rkm_key_len);
/* ValueLen */
rd_kafka_buf_write(rkbuf, varint_ValueLen, sz_ValueLen);
/* Write or copy Value/payload */
if (rkm->rkm_payload)
rd_kafka_msgset_writer_write_msg_payload(msetw, rkm, free_cb);
/* HeaderCount (headers currently not implemented) */
rd_kafka_buf_write(rkbuf, varint_HeaderCount, sz_HeaderCount);
/* Return written message size */
return MessageSize;
}
/**
* @brief Write message to messageset buffer.
* @returns the number of bytes written.
*/
static size_t
rd_kafka_msgset_writer_write_msg (rd_kafka_msgset_writer_t *msetw,
rd_kafka_msg_t *rkm,
int64_t Offset, int8_t MsgAttributes,
void (*free_cb)(void *)) {
size_t outlen;
size_t (*writer[]) (rd_kafka_msgset_writer_t *,
rd_kafka_msg_t *, int64_t, int8_t,
void (*)(void *)) = {
[0] = rd_kafka_msgset_writer_write_msg_v0_1,
[1] = rd_kafka_msgset_writer_write_msg_v0_1,
[2] = rd_kafka_msgset_writer_write_msg_v2
};
size_t actual_written;
size_t pre_pos;
if (likely(rkm->rkm_timestamp))
MsgAttributes |= RD_KAFKA_MSG_ATTR_CREATE_TIME;
pre_pos = rd_buf_write_pos(&msetw->msetw_rkbuf->rkbuf_buf);
outlen = writer[msetw->msetw_MsgVersion](msetw, rkm,
Offset, MsgAttributes,
free_cb);
actual_written = rd_buf_write_pos(&msetw->msetw_rkbuf->rkbuf_buf) -
pre_pos;
rd_assert(outlen <=
rd_kafka_msg_wire_size(rkm, msetw->msetw_MsgVersion));
rd_assert(outlen == actual_written);
return outlen;
}
/**
* @brief Write as many messages from the given message queue to
* the messageset.
*/
static void
rd_kafka_msgset_writer_write_msgq (rd_kafka_msgset_writer_t *msetw,
rd_kafka_msgq_t *rkmq) {
rd_kafka_buf_t *rkbuf = msetw->msetw_rkbuf;
rd_kafka_toppar_t *rktp = msetw->msetw_rktp;
rd_kafka_broker_t *rkb = msetw->msetw_rkb;
size_t len = rd_buf_len(&msetw->msetw_rkbuf->rkbuf_buf);
size_t max_msg_size = (size_t)msetw->msetw_rkb->rkb_rk->
rk_conf.max_msg_size;
rd_ts_t int_latency_base;
rd_ts_t MaxTimestamp = 0;
rd_kafka_msg_t *rkm;
int msgcnt = 0;
/* Internal latency calculation base.
* Uses rkm_ts_timeout which is enqueue time + timeout */
int_latency_base = rd_clock() +
(rktp->rktp_rkt->rkt_conf.message_timeout_ms * 1000);
/* Acquire BaseTimestamp from first message. */
rkm = TAILQ_FIRST(&rkmq->rkmq_msgs);
rd_kafka_assert(NULL, rkm);
msetw->msetw_firstmsg.timestamp = rkm->rkm_timestamp;
/*
* Write as many messages as possible until buffer is full
* or limit reached.
*/
do {
if (unlikely(msgcnt == msetw->msetw_msgcntmax ||
len + rd_kafka_msg_wire_size(rkm, msetw->
msetw_MsgVersion) >
max_msg_size)) {
rd_rkb_dbg(rkb, MSG, "PRODUCE",
"No more space in current MessageSet "
"(%i message(s), %"PRIusz" bytes)",
msgcnt, len);
break;
}
/* Move message to buffer's queue */
rd_kafka_msgq_deq(rkmq, rkm, 1);
rd_kafka_msgq_enq(&rkbuf->rkbuf_msgq, rkm);
/* Add internal latency metrics */
rd_avg_add(&rkb->rkb_avg_int_latency,
int_latency_base - rkm->rkm_ts_timeout);
/* MessageSet v2's .MaxTimestamp field */
if (unlikely(MaxTimestamp < rkm->rkm_timestamp))
MaxTimestamp = rkm->rkm_timestamp;
/* Write message to buffer */
len += rd_kafka_msgset_writer_write_msg(msetw, rkm, msgcnt, 0,
NULL);
rd_dassert(len <= max_msg_size);
msgcnt++;
} while ((rkm = TAILQ_FIRST(&rkmq->rkmq_msgs)));
msetw->msetw_MaxTimestamp = MaxTimestamp;
}
#if WITH_ZLIB
/**
* @brief Compress messageset using gzip/zlib
*/
static int
rd_kafka_msgset_writer_compress_gzip (rd_kafka_msgset_writer_t *msetw,
rd_slice_t *slice,
struct iovec *ciov) {
rd_kafka_broker_t *rkb = msetw->msetw_rkb;
rd_kafka_toppar_t *rktp = msetw->msetw_rktp;
z_stream strm;
size_t len = rd_slice_remains(slice);
const void *p;
size_t rlen;
int r;
memset(&strm, 0, sizeof(strm));
r = deflateInit2(&strm, Z_DEFAULT_COMPRESSION,
Z_DEFLATED, 15+16,
8, Z_DEFAULT_STRATEGY);
if (r != Z_OK) {
rd_rkb_log(rkb, LOG_ERR, "GZIP",
"Failed to initialize gzip for "
"compressing %"PRIusz" bytes in "
"topic %.*s [%"PRId32"]: %s (%i): "
"sending uncompressed",
len,
RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
rktp->rktp_partition,
strm.msg ? strm.msg : "", r);
return -1;
}
/* Calculate maximum compressed size and
* allocate an output buffer accordingly, being
* prefixed with the Message header. */
ciov->iov_len = deflateBound(&strm, (uLong)rd_slice_remains(slice));
ciov->iov_base = rd_malloc(ciov->iov_len);
strm.next_out = (void *)ciov->iov_base;
strm.avail_out = (uInt)ciov->iov_len;
/* Iterate through each segment and compress it. */
while ((rlen = rd_slice_reader(slice, &p))) {
strm.next_in = (void *)p;
strm.avail_in = (uInt)rlen;
/* Compress message */
if ((r = deflate(&strm, Z_NO_FLUSH) != Z_OK)) {
rd_rkb_log(rkb, LOG_ERR, "GZIP",
"Failed to gzip-compress "
"%"PRIusz" bytes (%"PRIusz" total) for "
"topic %.*s [%"PRId32"]: "
"%s (%i): "
"sending uncompressed",
rlen, len,
RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
rktp->rktp_partition,
strm.msg ? strm.msg : "", r);
deflateEnd(&strm);
rd_free(ciov->iov_base);
return -1;
}
rd_kafka_assert(rkb->rkb_rk, strm.avail_in == 0);
}
/* Finish the compression */
if ((r = deflate(&strm, Z_FINISH)) != Z_STREAM_END) {
rd_rkb_log(rkb, LOG_ERR, "GZIP",
"Failed to finish gzip compression "
" of %"PRIusz" bytes for "
"topic %.*s [%"PRId32"]: "
"%s (%i): "
"sending uncompressed",
len,
RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
rktp->rktp_partition,
strm.msg ? strm.msg : "", r);
deflateEnd(&strm);
rd_free(ciov->iov_base);
return -1;
}
ciov->iov_len = strm.total_out;
/* Deinitialize compression */
deflateEnd(&strm);
return 0;
}
#endif
#if WITH_SNAPPY
/**
* @brief Compress messageset using Snappy
*/
static int
rd_kafka_msgset_writer_compress_snappy (rd_kafka_msgset_writer_t *msetw,
rd_slice_t *slice, struct iovec *ciov) {
rd_kafka_broker_t *rkb = msetw->msetw_rkb;
rd_kafka_toppar_t *rktp = msetw->msetw_rktp;
struct iovec *iov;
size_t iov_max, iov_cnt;
struct snappy_env senv;
size_t len = rd_slice_remains(slice);
int r;
/* Initialize snappy compression environment */
rd_kafka_snappy_init_env_sg(&senv, 1/*iov enable*/);
/* Calculate maximum compressed size and
* allocate an output buffer accordingly. */
ciov->iov_len = rd_kafka_snappy_max_compressed_length(len);
ciov->iov_base = rd_malloc(ciov->iov_len);
iov_max = slice->buf->rbuf_segment_cnt;
iov = rd_alloca(sizeof(*iov) * iov_max);
rd_slice_get_iov(slice, iov, &iov_cnt, iov_max, len);
/* Compress each message */
if ((r = rd_kafka_snappy_compress_iov(&senv, iov, iov_cnt, len,
ciov)) != 0) {
rd_rkb_log(rkb, LOG_ERR, "SNAPPY",
"Failed to snappy-compress "
"%"PRIusz" bytes for "
"topic %.*s [%"PRId32"]: %s: "
"sending uncompressed",
len, RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
rktp->rktp_partition,
rd_strerror(-r));
rd_free(ciov->iov_base);
return -1;
}
/* rd_free snappy environment */
rd_kafka_snappy_free_env(&senv);
return 0;
}
#endif
/**
* @brief Compress messageset using LZ4F
*/
static int
rd_kafka_msgset_writer_compress_lz4 (rd_kafka_msgset_writer_t *msetw,
rd_slice_t *slice, struct iovec *ciov) {
rd_kafka_resp_err_t err;
err = rd_kafka_lz4_compress(msetw->msetw_rkb,
/* Correct or incorrect HC */
msetw->msetw_MsgVersion >= 1 ? 1 : 0,
slice, &ciov->iov_base, &ciov->iov_len);
return (err ? -1 : 0);
}
/**
* @brief Compress the message set.
* @param outlenp in: total uncompressed messages size,
* out (on success): returns the compressed buffer size.
* @returns 0 on success or if -1 if compression failed.
* @remark Compression failures are not critical, we'll just send the
* the messageset uncompressed.
*/
static int
rd_kafka_msgset_writer_compress (rd_kafka_msgset_writer_t *msetw,
size_t *outlenp) {
rd_kafka_toppar_t *rktp = msetw->msetw_rktp;
rd_buf_t *rbuf = &msetw->msetw_rkbuf->rkbuf_buf;
rd_slice_t slice;
size_t len = *outlenp;
struct iovec ciov = RD_ZERO_INIT; /* Compressed output buffer */
int r = -1;
size_t outlen;
rd_assert(rd_buf_len(rbuf) >= msetw->msetw_firstmsg.of + len);
/* Create buffer slice from firstmsg and onwards */
r = rd_slice_init(&slice, rbuf, msetw->msetw_firstmsg.of, len);
rd_assert(r == 0 || !*"invalid firstmsg position");
switch (rktp->rktp_rkt->rkt_conf.compression_codec)
{
#if WITH_ZLIB
case RD_KAFKA_COMPRESSION_GZIP:
r = rd_kafka_msgset_writer_compress_gzip(msetw, &slice, &ciov);
break;
#endif
#if WITH_SNAPPY
case RD_KAFKA_COMPRESSION_SNAPPY:
r = rd_kafka_msgset_writer_compress_snappy(msetw, &slice,
&ciov);
break;
#endif
case RD_KAFKA_COMPRESSION_LZ4:
/* Skip LZ4 compression if broker doesn't support it. */
if (!(msetw->msetw_rkb->rkb_features & RD_KAFKA_FEATURE_LZ4))
return -1;
r = rd_kafka_msgset_writer_compress_lz4(msetw, &slice, &ciov);
break;
default:
rd_kafka_assert(NULL,
!*"notreached: unsupported compression.codec");
break;
}
if (r == -1) /* Compression failed, send uncompressed */
return -1;
if (unlikely(ciov.iov_len > len)) {
/* If the compressed data is larger than the uncompressed size
* then throw it away and send as uncompressed. */
rd_free(ciov.iov_base);
return -1;
}
/* Set compression codec in MessageSet.Attributes */
msetw->msetw_Attributes |= rktp->rktp_rkt->rkt_conf.compression_codec;
/* Rewind rkbuf to the pre-message checkpoint (firstmsg)
* and replace the original message(s) with the compressed payload,
* possibly with version dependent enveloping. */
rd_buf_write_seek(rbuf, msetw->msetw_firstmsg.of);
rd_kafka_assert(msetw->msetw_rkb->rkb_rk, ciov.iov_len < INT32_MAX);
if (msetw->msetw_MsgVersion == 2) {
/* MsgVersion 2 has no inner MessageSet header or wrapping
* for compressed messages, just the messages back-to-back,
* so we can push the compressed memory directly to the
* buffer without wrapping it. */
rd_buf_push(rbuf, ciov.iov_base, ciov.iov_len, rd_free);
outlen = ciov.iov_len;
} else {
/* Older MessageSets envelope/wrap the compressed MessageSet
* in an outer Message. */
rd_kafka_msg_t rkm = {
.rkm_len = ciov.iov_len,
.rkm_payload = ciov.iov_base,
.rkm_timestamp = msetw->msetw_firstmsg.timestamp
};
outlen = rd_kafka_msgset_writer_write_msg(
msetw, &rkm, 0,
rktp->rktp_rkt->rkt_conf.compression_codec,
rd_free/*free for ciov.iov_base*/);
}
*outlenp = outlen;
return 0;
}
/**
* @brief Calculate MessageSet v2 CRC (CRC32C) when messageset is complete.
*/
static void
rd_kafka_msgset_writer_calc_crc_v2 (rd_kafka_msgset_writer_t *msetw) {
int32_t crc;
rd_slice_t slice;
int r;
r = rd_slice_init(&slice, &msetw->msetw_rkbuf->rkbuf_buf,
msetw->msetw_of_CRC+4,
rd_buf_write_pos(&msetw->msetw_rkbuf->rkbuf_buf) -
msetw->msetw_of_CRC-4);
rd_assert(!r && *"slice_init failed");
/* CRC32C calculation */
crc = rd_slice_crc32c(&slice);
/* Update CRC at MessageSet v2 CRC offset */
rd_kafka_buf_update_i32(msetw->msetw_rkbuf, msetw->msetw_of_CRC, crc);
}
/**
* @brief Finalize MessageSet v2 header fields.
*/
static void
rd_kafka_msgset_writer_finalize_MessageSet_v2_header (
rd_kafka_msgset_writer_t *msetw) {
rd_kafka_buf_t *rkbuf = msetw->msetw_rkbuf;
int msgcnt = rd_kafka_msgq_len(&rkbuf->rkbuf_msgq);
rd_kafka_assert(NULL, msgcnt > 0);
rd_kafka_assert(NULL, msetw->msetw_ApiVersion >= 3);
msetw->msetw_MessageSetSize = RD_KAFKAP_MSGSET_V2_SIZE +
msetw->msetw_messages_len;
/* MessageSet.Length is the same as
* MessageSetSize minus field widths for FirstOffset+Length */
rd_kafka_buf_update_i32(rkbuf, msetw->msetw_of_start +
RD_KAFKAP_MSGSET_V2_OF_Length,
(int32_t)msetw->msetw_MessageSetSize - (8+4));
msetw->msetw_Attributes |= RD_KAFKA_MSG_ATTR_CREATE_TIME;
rd_kafka_buf_update_i16(rkbuf, msetw->msetw_of_start +
RD_KAFKAP_MSGSET_V2_OF_Attributes,
msetw->msetw_Attributes);
rd_kafka_buf_update_i32(rkbuf, msetw->msetw_of_start +
RD_KAFKAP_MSGSET_V2_OF_LastOffsetDelta,
msgcnt-1);
rd_kafka_buf_update_i64(rkbuf, msetw->msetw_of_start +
RD_KAFKAP_MSGSET_V2_OF_BaseTimestamp,
msetw->msetw_firstmsg.timestamp);
rd_kafka_buf_update_i64(rkbuf, msetw->msetw_of_start +
RD_KAFKAP_MSGSET_V2_OF_MaxTimestamp,
msetw->msetw_MaxTimestamp);
rd_kafka_buf_update_i32(rkbuf, msetw->msetw_of_start +
RD_KAFKAP_MSGSET_V2_OF_RecordCount, msgcnt);
rd_kafka_msgset_writer_calc_crc_v2(msetw);
}
/**
* @brief Finalize the MessageSet header, if applicable.
*/
static void
rd_kafka_msgset_writer_finalize_MessageSet (rd_kafka_msgset_writer_t *msetw) {
rd_dassert(msetw->msetw_messages_len > 0);
if (msetw->msetw_MsgVersion == 2)
rd_kafka_msgset_writer_finalize_MessageSet_v2_header(msetw);
else
msetw->msetw_MessageSetSize = RD_KAFKAP_MSGSET_V0_SIZE +
msetw->msetw_messages_len;
/* Update MessageSetSize */
rd_kafka_buf_update_i32(msetw->msetw_rkbuf,
msetw->msetw_of_MessageSetSize,
(int32_t)msetw->msetw_MessageSetSize);
}
/**
* @brief Finalize the messageset - call when no more messages are to be
* added to the messageset.
*
* Will compress, update final values, CRCs, etc.
*
* The messageset writer is destroyed and the buffer is returned
* and ready to be transmitted.
*
* @param MessagetSetSizep will be set to the finalized MessageSetSize
*
* @returns the buffer to transmit or NULL if there were no messages
* in messageset.
*/
static rd_kafka_buf_t *
rd_kafka_msgset_writer_finalize (rd_kafka_msgset_writer_t *msetw,
size_t *MessageSetSizep) {
rd_kafka_buf_t *rkbuf = msetw->msetw_rkbuf;
rd_kafka_toppar_t *rktp = msetw->msetw_rktp;
size_t len;
int cnt;
/* No messages added, bail out early. */
if (unlikely((cnt = rd_kafka_msgq_len(&rkbuf->rkbuf_msgq)) == 0)) {
rd_kafka_buf_destroy(rkbuf);
return NULL;
}
/* Total size of messages */
len = rd_buf_write_pos(&msetw->msetw_rkbuf->rkbuf_buf) -
msetw->msetw_firstmsg.of;
rd_assert(len > 0);
rd_assert(len <= (size_t)rktp->rktp_rkt->rkt_rk->rk_conf.max_msg_size);
/* Compress the message set */
if (rktp->rktp_rkt->rkt_conf.compression_codec)
rd_kafka_msgset_writer_compress(msetw, &len);
msetw->msetw_messages_len = len;
/* Finalize MessageSet header fields */
rd_kafka_msgset_writer_finalize_MessageSet(msetw);
/* Return final MessageSetSize */
*MessageSetSizep = msetw->msetw_MessageSetSize;
rd_rkb_dbg(msetw->msetw_rkb, MSG, "PRODUCE",
"%s [%"PRId32"]: "
"Produce MessageSet with %i message(s) (%"PRIusz" bytes, "
"ApiVersion %d, MsgVersion %d)",
rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
cnt, msetw->msetw_MessageSetSize,
msetw->msetw_ApiVersion, msetw->msetw_MsgVersion);
return rkbuf;
}
/**
* @brief Create ProduceRequest containing as many messages from
* the toppar's transmit queue as possible, limited by configuration,
* size, etc.
*
* @param rkb broker to create buffer for
* @param rktp toppar to transmit messages for
* @param MessagetSetSizep will be set to the final MessageSetSize
*
* @returns the buffer to transmit or NULL if there were no messages
* in messageset.
*/
rd_kafka_buf_t *
rd_kafka_msgset_create_ProduceRequest (rd_kafka_broker_t *rkb,
rd_kafka_toppar_t *rktp,
size_t *MessageSetSizep) {
rd_kafka_msgset_writer_t msetw;
if (rd_kafka_msgset_writer_init(&msetw, rkb, rktp) == 0)
return NULL;
rd_kafka_msgset_writer_write_msgq(&msetw, &rktp->rktp_xmit_msgq);
return rd_kafka_msgset_writer_finalize(&msetw, MessageSetSizep);
}