blob: 4f3f2f997273873ba92c318c9ec48b6c9b967557 [file] [log] [blame]
/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2012,2013 Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#pragma once
#include "rdsysqueue.h"
#include "rdkafka_proto.h"
/**
* @brief Message.MsgAttributes for MsgVersion v0..v1,
* also used for MessageSet.Attributes for MsgVersion v2.
*/
#define RD_KAFKA_MSG_ATTR_GZIP (1 << 0)
#define RD_KAFKA_MSG_ATTR_SNAPPY (1 << 1)
#define RD_KAFKA_MSG_ATTR_LZ4 (3)
#define RD_KAFKA_MSG_ATTR_COMPRESSION_MASK 0x3
#define RD_KAFKA_MSG_ATTR_CREATE_TIME (0 << 3)
#define RD_KAFKA_MSG_ATTR_LOG_APPEND_TIME (1 << 3)
/**
* @brief MessageSet.Attributes for MsgVersion v2
*
* Attributes:
* -------------------------------------------------------------------------------------------------
* | Unused (6-15) | Control (5) | Transactional (4) | Timestamp Type (3) | Compression Type (0-2) |
* -------------------------------------------------------------------------------------------------
*/
/* Compression types same as MsgVersion 0 above */
/* Timestamp type same as MsgVersion 0 above */
#define RD_KAFKA_MSGSET_V2_ATTR_TRANSACTIONAL (1 << 4)
#define RD_KAFKA_MSGSET_V2_ATTR_CONTROL (1 << 5)
typedef struct rd_kafka_msg_s {
rd_kafka_message_t rkm_rkmessage; /* MUST be first field */
#define rkm_len rkm_rkmessage.len
#define rkm_payload rkm_rkmessage.payload
#define rkm_opaque rkm_rkmessage._private
#define rkm_partition rkm_rkmessage.partition
#define rkm_offset rkm_rkmessage.offset
#define rkm_key rkm_rkmessage.key
#define rkm_key_len rkm_rkmessage.key_len
#define rkm_err rkm_rkmessage.err
TAILQ_ENTRY(rd_kafka_msg_s) rkm_link;
int rkm_flags;
/* @remark These additional flags must not collide with
* the RD_KAFKA_MSG_F_* flags in rdkafka.h */
#define RD_KAFKA_MSG_F_FREE_RKM 0x10000 /* msg_t is allocated */
#define RD_KAFKA_MSG_F_ACCOUNT 0x20000 /* accounted for in curr_msgs */
int64_t rkm_timestamp; /* Message format V1.
* Meaning of timestamp depends on
* message Attribute LogAppendtime (broker)
* or CreateTime (producer).
* Unit is milliseconds since epoch (UTC).*/
rd_kafka_timestamp_type_t rkm_tstype; /* rkm_timestamp type */
union {
struct {
rd_ts_t ts_timeout; /* Message timeout */
rd_ts_t ts_enq; /* Enqueue/Produce time */
} producer;
#define rkm_ts_timeout rkm_u.producer.ts_timeout
#define rkm_ts_enq rkm_u.producer.ts_enq
} rkm_u;
} rd_kafka_msg_t;
TAILQ_HEAD(rd_kafka_msg_head_s, rd_kafka_msg_s);
/** @returns the absolute time a message was enqueued (producer) */
#define rd_kafka_msg_enq_time(rkm) ((rkm)->rkm_ts_enq)
/**
* @returns the message's total maximum on-wire size.
* @remark Depending on message version (MagicByte) the actual size
* may be smaller.
*/
static RD_INLINE RD_UNUSED
size_t rd_kafka_msg_wire_size (const rd_kafka_msg_t *rkm, int MsgVersion) {
static const size_t overheads[] = {
[0] = RD_KAFKAP_MESSAGE_V0_OVERHEAD,
[1] = RD_KAFKAP_MESSAGE_V1_OVERHEAD,
[2] = RD_KAFKAP_MESSAGE_V2_OVERHEAD
};
rd_dassert(MsgVersion >= 0 && MsgVersion <= 2);
return overheads[MsgVersion] + rkm->rkm_len + rkm->rkm_key_len;
}
/**
* @returns the enveloping rd_kafka_msg_t pointer for a rd_kafka_msg_t
* wrapped rd_kafka_message_t.
*/
static RD_INLINE RD_UNUSED
rd_kafka_msg_t *rd_kafka_message2msg (rd_kafka_message_t *rkmessage) {
return (rd_kafka_msg_t *)rkmessage;
}
typedef struct rd_kafka_msgq_s {
TAILQ_HEAD(, rd_kafka_msg_s) rkmq_msgs;
rd_atomic32_t rkmq_msg_cnt;
rd_atomic64_t rkmq_msg_bytes;
} rd_kafka_msgq_t;
#define RD_KAFKA_MSGQ_INITIALIZER(rkmq) \
{ .rkmq_msgs = TAILQ_HEAD_INITIALIZER((rkmq).rkmq_msgs) }
#define RD_KAFKA_MSGQ_FOREACH(elm,head) \
TAILQ_FOREACH(elm, &(head)->rkmq_msgs, rkm_link)
/**
* Returns the number of messages in the specified queue.
*/
static RD_INLINE RD_UNUSED int rd_kafka_msgq_len (rd_kafka_msgq_t *rkmq) {
return (int)rd_atomic32_get(&rkmq->rkmq_msg_cnt);
}
/**
* Returns the total number of bytes in the specified queue.
*/
static RD_INLINE RD_UNUSED size_t rd_kafka_msgq_size (rd_kafka_msgq_t *rkmq) {
return (size_t)rd_atomic64_get(&rkmq->rkmq_msg_bytes);
}
void rd_kafka_msg_destroy (rd_kafka_t *rk, rd_kafka_msg_t *rkm);
int rd_kafka_msg_new (rd_kafka_itopic_t *rkt, int32_t force_partition,
int msgflags,
char *payload, size_t len,
const void *keydata, size_t keylen,
void *msg_opaque);
static RD_INLINE RD_UNUSED void rd_kafka_msgq_init (rd_kafka_msgq_t *rkmq) {
TAILQ_INIT(&rkmq->rkmq_msgs);
rd_atomic32_init(&rkmq->rkmq_msg_cnt, 0);
rd_atomic64_init(&rkmq->rkmq_msg_bytes, 0);
}
/**
* Concat all elements of 'src' onto tail of 'dst'.
* 'src' will be cleared.
* Proper locks for 'src' and 'dst' must be held.
*/
static RD_INLINE RD_UNUSED void rd_kafka_msgq_concat (rd_kafka_msgq_t *dst,
rd_kafka_msgq_t *src) {
TAILQ_CONCAT(&dst->rkmq_msgs, &src->rkmq_msgs, rkm_link);
rd_atomic32_add(&dst->rkmq_msg_cnt, rd_atomic32_get(&src->rkmq_msg_cnt));
rd_atomic64_add(&dst->rkmq_msg_bytes, rd_atomic64_get(&src->rkmq_msg_bytes));
rd_kafka_msgq_init(src);
}
/**
* Move queue 'src' to 'dst' (overwrites dst)
* Source will be cleared.
*/
static RD_INLINE RD_UNUSED void rd_kafka_msgq_move (rd_kafka_msgq_t *dst,
rd_kafka_msgq_t *src) {
TAILQ_MOVE(&dst->rkmq_msgs, &src->rkmq_msgs, rkm_link);
rd_atomic32_set(&dst->rkmq_msg_cnt, rd_atomic32_get(&src->rkmq_msg_cnt));
rd_atomic64_set(&dst->rkmq_msg_bytes, rd_atomic64_get(&src->rkmq_msg_bytes));
rd_kafka_msgq_init(src);
}
/**
* rd_free all msgs in msgq and reinitialize the msgq.
*/
static RD_INLINE RD_UNUSED void rd_kafka_msgq_purge (rd_kafka_t *rk,
rd_kafka_msgq_t *rkmq) {
rd_kafka_msg_t *rkm, *next;
next = TAILQ_FIRST(&rkmq->rkmq_msgs);
while (next) {
rkm = next;
next = TAILQ_NEXT(next, rkm_link);
rd_kafka_msg_destroy(rk, rkm);
}
rd_kafka_msgq_init(rkmq);
}
/**
* Remove message from message queue
*/
static RD_INLINE RD_UNUSED
rd_kafka_msg_t *rd_kafka_msgq_deq (rd_kafka_msgq_t *rkmq,
rd_kafka_msg_t *rkm,
int do_count) {
if (likely(do_count)) {
rd_kafka_assert(NULL, rd_atomic32_get(&rkmq->rkmq_msg_cnt) > 0);
rd_kafka_assert(NULL, rd_atomic64_get(&rkmq->rkmq_msg_bytes) >= (int64_t)(rkm->rkm_len+rkm->rkm_key_len));
rd_atomic32_sub(&rkmq->rkmq_msg_cnt, 1);
rd_atomic64_sub(&rkmq->rkmq_msg_bytes,
rkm->rkm_len+rkm->rkm_key_len);
}
TAILQ_REMOVE(&rkmq->rkmq_msgs, rkm, rkm_link);
return rkm;
}
static RD_INLINE RD_UNUSED
rd_kafka_msg_t *rd_kafka_msgq_pop (rd_kafka_msgq_t *rkmq) {
rd_kafka_msg_t *rkm;
if (((rkm = TAILQ_FIRST(&rkmq->rkmq_msgs))))
rd_kafka_msgq_deq(rkmq, rkm, 1);
return rkm;
}
/**
* Insert message at head of message queue.
*/
static RD_INLINE RD_UNUSED void rd_kafka_msgq_insert (rd_kafka_msgq_t *rkmq,
rd_kafka_msg_t *rkm) {
TAILQ_INSERT_HEAD(&rkmq->rkmq_msgs, rkm, rkm_link);
rd_atomic32_add(&rkmq->rkmq_msg_cnt, 1);
rd_atomic64_add(&rkmq->rkmq_msg_bytes, rkm->rkm_len+rkm->rkm_key_len);
}
/**
* Append message to tail of message queue.
*/
static RD_INLINE RD_UNUSED void rd_kafka_msgq_enq (rd_kafka_msgq_t *rkmq,
rd_kafka_msg_t *rkm) {
TAILQ_INSERT_TAIL(&rkmq->rkmq_msgs, rkm, rkm_link);
rd_atomic32_add(&rkmq->rkmq_msg_cnt, 1);
rd_atomic64_add(&rkmq->rkmq_msg_bytes, rkm->rkm_len+rkm->rkm_key_len);
}
/**
* Scans a message queue for timed out messages and removes them from
* 'rkmq' and adds them to 'timedout', returning the number of timed out
* messages.
* 'timedout' must be initialized.
*/
int rd_kafka_msgq_age_scan (rd_kafka_msgq_t *rkmq,
rd_kafka_msgq_t *timedout,
rd_ts_t now);
int rd_kafka_msg_partitioner (rd_kafka_itopic_t *rkt, rd_kafka_msg_t *rkm,
int do_lock);
rd_kafka_message_t *rd_kafka_message_get (struct rd_kafka_op_s *rko);
rd_kafka_message_t *rd_kafka_message_get_from_rkm (struct rd_kafka_op_s *rko,
rd_kafka_msg_t *rkm);
rd_kafka_message_t *rd_kafka_message_new (void);