blob: d778c4df627a6b393d97002172c1f6fb50c5cb61 [file] [log] [blame]
/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2012,2013 Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#pragma once
#include "rdendian.h"
#include "rdvarint.h"
/*
* Kafka protocol definitions.
*/
#define RD_KAFKA_PORT 9092
#define RD_KAFKA_PORT_STR "9092"
/**
* Request types
*/
struct rd_kafkap_reqhdr {
int32_t Size;
int16_t ApiKey;
#define RD_KAFKAP_None -1
#define RD_KAFKAP_Produce 0
#define RD_KAFKAP_Fetch 1
#define RD_KAFKAP_Offset 2
#define RD_KAFKAP_Metadata 3
#define RD_KAFKAP_LeaderAndIsr 4
#define RD_KAFKAP_StopReplica 5
#define RD_KAFKAP_OffsetCommit 8
#define RD_KAFKAP_OffsetFetch 9
#define RD_KAFKAP_GroupCoordinator 10
#define RD_KAFKAP_JoinGroup 11
#define RD_KAFKAP_Heartbeat 12
#define RD_KAFKAP_LeaveGroup 13
#define RD_KAFKAP_SyncGroup 14
#define RD_KAFKAP_DescribeGroups 15
#define RD_KAFKAP_ListGroups 16
#define RD_KAFKAP_SaslHandshake 17
#define RD_KAFKAP_ApiVersion 18
#define RD_KAFKAP_CreateTopics 19
#define RD_KAFKAP_DeleteTopics 20
#define RD_KAFKAP_DeleteRecords 21
#define RD_KAFKAP_InitProducerId 22
#define RD_KAFKAP_OffsetForLeaderEpoch 23
#define RD_KAFKAP_AddPartitionsToTxn 24
#define RD_KAFKAP_AddOffsetsToTxn 25
#define RD_KAFKAP_EndTxn 26
#define RD_KAFKAP_WriteTxnMarkers 27
#define RD_KAFKAP_TxnOffsetCommit 28
#define RD_KAFKAP_DescribeAcls 29
#define RD_KAFKAP_CreateAcls 30
#define RD_KAFKAP_DeleteAcls 31
#define RD_KAFKAP_DescribeConfigs 32
#define RD_KAFKAP_AlterConfigs 33
#define RD_KAFKAP__NUM 34
int16_t ApiVersion;
int32_t CorrId;
/* ClientId follows */
};
#define RD_KAFKAP_REQHDR_SIZE (4+2+2+4)
#define RD_KAFKAP_RESHDR_SIZE (4+4)
/**
* Response header
*/
struct rd_kafkap_reshdr {
int32_t Size;
int32_t CorrId;
};
static RD_UNUSED
const char *rd_kafka_ApiKey2str (int16_t ApiKey) {
static const char *names[] = {
[RD_KAFKAP_Produce] = "Produce",
[RD_KAFKAP_Fetch] = "Fetch",
[RD_KAFKAP_Offset] = "Offset",
[RD_KAFKAP_Metadata] = "Metadata",
[RD_KAFKAP_LeaderAndIsr] = "LeaderAndIsr",
[RD_KAFKAP_StopReplica] = "StopReplica",
[RD_KAFKAP_OffsetCommit] = "OffsetCommit",
[RD_KAFKAP_OffsetFetch] = "OffsetFetch",
[RD_KAFKAP_GroupCoordinator] = "GroupCoordinator",
[RD_KAFKAP_JoinGroup] = "JoinGroup",
[RD_KAFKAP_Heartbeat] = "Heartbeat",
[RD_KAFKAP_LeaveGroup] = "LeaveGroup",
[RD_KAFKAP_SyncGroup] = "SyncGroup",
[RD_KAFKAP_DescribeGroups] = "DescribeGroups",
[RD_KAFKAP_ListGroups] = "ListGroups",
[RD_KAFKAP_SaslHandshake] = "SaslHandshake",
[RD_KAFKAP_ApiVersion] = "ApiVersion",
[RD_KAFKAP_CreateTopics] = "CreateTopics",
[RD_KAFKAP_DeleteTopics] = "DeleteTopics",
[RD_KAFKAP_DeleteRecords] = "DeleteRecords",
[RD_KAFKAP_InitProducerId] = "InitProducerId",
[RD_KAFKAP_OffsetForLeaderEpoch] = "OffsetForLeaderEpoch",
[RD_KAFKAP_AddPartitionsToTxn] = "AddPartitionsToTxn",
[RD_KAFKAP_AddOffsetsToTxn] = "AddOffsetsToTxn",
[RD_KAFKAP_EndTxn] = "EndTxn",
[RD_KAFKAP_WriteTxnMarkers] = "WriteTxnMarkers",
[RD_KAFKAP_TxnOffsetCommit] = "TxnOffsetCommit",
[RD_KAFKAP_DescribeAcls] = "DescribeAcls",
[RD_KAFKAP_CreateAcls] = "CreateAcls",
[RD_KAFKAP_DeleteAcls] = "DeleteAcls",
[RD_KAFKAP_DescribeConfigs] = "DescribeConfigs",
[RD_KAFKAP_AlterConfigs] = "AlterConfigs"
};
static RD_TLS char ret[32];
if (ApiKey < 0 || ApiKey >= (int)RD_ARRAYSIZE(names)) {
rd_snprintf(ret, sizeof(ret), "Unknown-%hd?", ApiKey);
return ret;
}
return names[ApiKey];
}
/**
* @brief ApiKey version support tuple.
*/
struct rd_kafka_ApiVersion {
int16_t ApiKey;
int16_t MinVer;
int16_t MaxVer;
};
/**
* @brief ApiVersion.ApiKey comparator.
*/
static RD_UNUSED int rd_kafka_ApiVersion_key_cmp (const void *_a, const void *_b) {
const struct rd_kafka_ApiVersion *a = _a, *b = _b;
return a->ApiKey - b->ApiKey;
}
#define RD_KAFKAP_READ_UNCOMMITTED 0
#define RD_KAFKAP_READ_COMMITTED 1
/**
*
* Kafka protocol string representation prefixed with a convenience header
*
* Serialized format:
* { uint16, data.. }
*
*/
typedef struct rd_kafkap_str_s {
/* convenience header (aligned access, host endian) */
int len; /* Kafka string length (-1=NULL, 0=empty, >0=string) */
const char *str; /* points into data[] or other memory,
* not NULL-terminated */
} rd_kafkap_str_t;
#define RD_KAFKAP_STR_LEN_NULL -1
#define RD_KAFKAP_STR_IS_NULL(kstr) ((kstr)->len == RD_KAFKAP_STR_LEN_NULL)
/* Returns the length of the string of a kafka protocol string representation */
#define RD_KAFKAP_STR_LEN0(len) ((len) == RD_KAFKAP_STR_LEN_NULL ? 0 : (len))
#define RD_KAFKAP_STR_LEN(kstr) RD_KAFKAP_STR_LEN0((kstr)->len)
/* Returns the actual size of a kafka protocol string representation. */
#define RD_KAFKAP_STR_SIZE0(len) (2 + RD_KAFKAP_STR_LEN0(len))
#define RD_KAFKAP_STR_SIZE(kstr) RD_KAFKAP_STR_SIZE0((kstr)->len)
/* Serialized Kafka string: only works for _new() kstrs */
#define RD_KAFKAP_STR_SER(kstr) ((kstr)+1)
/* Macro suitable for "%.*s" printing. */
#define RD_KAFKAP_STR_PR(kstr) \
(int)((kstr)->len == RD_KAFKAP_STR_LEN_NULL ? 0 : (kstr)->len), \
(kstr)->str
/* strndupa() a Kafka string */
#define RD_KAFKAP_STR_DUPA(destptr,kstr) \
rd_strndupa((destptr), (kstr)->str, RD_KAFKAP_STR_LEN(kstr))
/* strndup() a Kafka string */
#define RD_KAFKAP_STR_DUP(kstr) rd_strndup((kstr)->str, RD_KAFKAP_STR_LEN(kstr))
/**
* Frees a Kafka string previously allocated with `rd_kafkap_str_new()`
*/
static RD_UNUSED void rd_kafkap_str_destroy (rd_kafkap_str_t *kstr) {
rd_free(kstr);
}
/**
* Allocate a new Kafka string and make a copy of 'str'.
* If 'len' is -1 the length will be calculated.
* Supports Kafka NULL strings.
* Nul-terminates the string, but the trailing \0 is not part of
* the serialized string.
*/
static RD_INLINE RD_UNUSED
rd_kafkap_str_t *rd_kafkap_str_new (const char *str, int len) {
rd_kafkap_str_t *kstr;
int16_t klen;
if (!str)
len = RD_KAFKAP_STR_LEN_NULL;
else if (len == -1)
len = str ? (int)strlen(str) : RD_KAFKAP_STR_LEN_NULL;
kstr = rd_malloc(sizeof(*kstr) + 2 +
(len == RD_KAFKAP_STR_LEN_NULL ? 0 : len + 1));
kstr->len = len;
/* Serialised format: 16-bit string length */
klen = htobe16(len);
memcpy(kstr+1, &klen, 2);
/* Serialised format: non null-terminated string */
if (len == RD_KAFKAP_STR_LEN_NULL)
kstr->str = NULL;
else {
kstr->str = ((const char *)(kstr+1))+2;
memcpy((void *)kstr->str, str, len);
((char *)kstr->str)[len] = '\0';
}
return kstr;
}
/**
* Makes a copy of `src`. The copy will be fully allocated and should
* be freed with rd_kafka_pstr_destroy()
*/
static RD_INLINE RD_UNUSED
rd_kafkap_str_t *rd_kafkap_str_copy (const rd_kafkap_str_t *src) {
return rd_kafkap_str_new(src->str, src->len);
}
static RD_INLINE RD_UNUSED int rd_kafkap_str_cmp (const rd_kafkap_str_t *a,
const rd_kafkap_str_t *b) {
int minlen = RD_MIN(a->len, b->len);
int r = memcmp(a->str, b->str, minlen);
if (r)
return r;
else
return a->len - b->len;
}
static RD_INLINE RD_UNUSED int rd_kafkap_str_cmp_str (const rd_kafkap_str_t *a,
const char *str) {
int len = (int)strlen(str);
int minlen = RD_MIN(a->len, len);
int r = memcmp(a->str, str, minlen);
if (r)
return r;
else
return a->len - len;
}
static RD_INLINE RD_UNUSED int rd_kafkap_str_cmp_str2 (const char *str,
const rd_kafkap_str_t *b){
int len = (int)strlen(str);
int minlen = RD_MIN(b->len, len);
int r = memcmp(str, b->str, minlen);
if (r)
return r;
else
return len - b->len;
}
/**
*
* Kafka protocol bytes array representation prefixed with a convenience header
*
* Serialized format:
* { uint32, data.. }
*
*/
typedef struct rd_kafkap_bytes_s {
/* convenience header (aligned access, host endian) */
int32_t len; /* Kafka bytes length (-1=NULL, 0=empty, >0=data) */
const void *data; /* points just past the struct, or other memory,
* not NULL-terminated */
const char _data[1]; /* Bytes following struct when new()ed */
} rd_kafkap_bytes_t;
#define RD_KAFKAP_BYTES_LEN_NULL -1
#define RD_KAFKAP_BYTES_IS_NULL(kbytes) \
((kbytes)->len == RD_KAFKAP_BYTES_LEN_NULL)
/* Returns the length of the bytes of a kafka protocol bytes representation */
#define RD_KAFKAP_BYTES_LEN0(len) ((len) == RD_KAFKAP_BYTES_LEN_NULL ? 0:(len))
#define RD_KAFKAP_BYTES_LEN(kbytes) RD_KAFKAP_BYTES_LEN0((kbytes)->len)
/* Returns the actual size of a kafka protocol bytes representation. */
#define RD_KAFKAP_BYTES_SIZE0(len) (4 + RD_KAFKAP_BYTES_LEN0(len))
#define RD_KAFKAP_BYTES_SIZE(kbytes) RD_KAFKAP_BYTES_SIZE0((kbytes)->len)
/* Serialized Kafka bytes: only works for _new() kbytes */
#define RD_KAFKAP_BYTES_SER(kbytes) ((kbytes)+1)
/**
* Frees a Kafka bytes previously allocated with `rd_kafkap_bytes_new()`
*/
static RD_UNUSED void rd_kafkap_bytes_destroy (rd_kafkap_bytes_t *kbytes) {
rd_free(kbytes);
}
/**
* @brief Allocate a new Kafka bytes and make a copy of 'bytes'.
* If \p len > 0 but \p bytes is NULL no copying is performed by
* the bytes structure will be allocated to fit \p size bytes.
*
* Supports:
* - Kafka NULL bytes (bytes==NULL,len==0),
* - Empty bytes (bytes!=NULL,len==0)
* - Copy data (bytes!=NULL,len>0)
* - No-copy, just alloc (bytes==NULL,len>0)
*/
static RD_INLINE RD_UNUSED
rd_kafkap_bytes_t *rd_kafkap_bytes_new (const char *bytes, int32_t len) {
rd_kafkap_bytes_t *kbytes;
int32_t klen;
if (!bytes && !len)
len = RD_KAFKAP_BYTES_LEN_NULL;
kbytes = rd_malloc(sizeof(*kbytes) + 4 +
(len == RD_KAFKAP_BYTES_LEN_NULL ? 0 : len));
kbytes->len = len;
klen = htobe32(len);
memcpy(kbytes+1, &klen, 4);
if (len == RD_KAFKAP_BYTES_LEN_NULL)
kbytes->data = NULL;
else {
kbytes->data = ((const char *)(kbytes+1))+4;
if (bytes)
memcpy((void *)kbytes->data, bytes, len);
}
return kbytes;
}
/**
* Makes a copy of `src`. The copy will be fully allocated and should
* be freed with rd_kafkap_bytes_destroy()
*/
static RD_INLINE RD_UNUSED
rd_kafkap_bytes_t *rd_kafkap_bytes_copy (const rd_kafkap_bytes_t *src) {
return rd_kafkap_bytes_new(src->data, src->len);
}
static RD_INLINE RD_UNUSED int rd_kafkap_bytes_cmp (const rd_kafkap_bytes_t *a,
const rd_kafkap_bytes_t *b) {
int minlen = RD_MIN(a->len, b->len);
int r = memcmp(a->data, b->data, minlen);
if (r)
return r;
else
return a->len - b->len;
}
static RD_INLINE RD_UNUSED
int rd_kafkap_bytes_cmp_data (const rd_kafkap_bytes_t *a,
const char *data, int len) {
int minlen = RD_MIN(a->len, len);
int r = memcmp(a->data, data, minlen);
if (r)
return r;
else
return a->len - len;
}
typedef struct rd_kafka_buf_s rd_kafka_buf_t;
#define RD_KAFKA_NODENAME_SIZE 128
/**
* @brief Message overheads (worst-case)
*/
/**
* MsgVersion v0..v1
*/
/* Offset + MessageSize */
#define RD_KAFKAP_MESSAGESET_V0_HDR_SIZE (8+4)
/* CRC + Magic + Attr + KeyLen + ValueLen */
#define RD_KAFKAP_MESSAGE_V0_HDR_SIZE (4+1+1+4+4)
/* CRC + Magic + Attr + Timestamp + KeyLen + ValueLen */
#define RD_KAFKAP_MESSAGE_V1_HDR_SIZE (4+1+1+8+4+4)
/* Maximum per-message overhead */
#define RD_KAFKAP_MESSAGE_V0_OVERHEAD \
(RD_KAFKAP_MESSAGESET_V0_HDR_SIZE + RD_KAFKAP_MESSAGE_V0_HDR_SIZE)
#define RD_KAFKAP_MESSAGE_V1_OVERHEAD \
(RD_KAFKAP_MESSAGESET_V0_HDR_SIZE + RD_KAFKAP_MESSAGE_V1_HDR_SIZE)
/**
* MsgVersion v2
*/
#define RD_KAFKAP_MESSAGE_V2_OVERHEAD \
( \
/* Length (varint) */ \
RD_UVARINT_ENC_SIZEOF(int32_t) + \
/* Attributes */ \
1 + \
/* TimestampDelta (varint) */ \
RD_UVARINT_ENC_SIZEOF(int64_t) + \
/* OffsetDelta (varint) */ \
RD_UVARINT_ENC_SIZEOF(int32_t) + \
/* KeyLen (varint) */ \
RD_UVARINT_ENC_SIZEOF(int32_t) + \
/* ValueLen (varint) */ \
RD_UVARINT_ENC_SIZEOF(int32_t) + \
/* HeaderCnt (varint): */ \
RD_UVARINT_ENC_SIZEOF(int32_t) \
)
/**
* @brief MessageSets are not explicitly versioned but depends on the
* Produce/Fetch API version and the encompassed Message versions.
* We use the Message version (MsgVersion, aka MagicByte) to describe
* the MessageSet version, that is, MsgVersion <= 1 uses the old
* MessageSet version (v0?) while MsgVersion 2 uses MessageSet version v2
*/
/* Old MessageSet header: none */
#define RD_KAFKAP_MSGSET_V0_SIZE 0
/* MessageSet v2 header */
#define RD_KAFKAP_MSGSET_V2_SIZE (8+4+4+1+4+2+4+8+8+8+2+4+4)
/* Byte offsets for MessageSet fields */
#define RD_KAFKAP_MSGSET_V2_OF_Length (8)
#define RD_KAFKAP_MSGSET_V2_OF_CRC (8+4+4+1)
#define RD_KAFKAP_MSGSET_V2_OF_Attributes (8+4+4+1+4)
#define RD_KAFKAP_MSGSET_V2_OF_LastOffsetDelta (8+4+4+1+4+2)
#define RD_KAFKAP_MSGSET_V2_OF_BaseTimestamp (8+4+4+1+4+2+4)
#define RD_KAFKAP_MSGSET_V2_OF_MaxTimestamp (8+4+4+1+4+2+4+8)
#define RD_KAFKAP_MSGSET_V2_OF_RecordCount (8+4+4+1+4+2+4+8+8+8+2+4)