blob: b5869885efe080f2d25cf3c078ca41ad5a9c0976 [file] [log] [blame]
/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2012,2013 Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
// FIXME: Revise this documentation:
/**
* This file implements the consumer offset storage.
* It currently supports local file storage and broker OffsetCommit storage,
* not zookeeper.
*
* Regardless of commit method (file, broker, ..) this is how it works:
* - When rdkafka, or the application, depending on if auto.offset.commit
* is enabled or not, calls rd_kafka_offset_store() with an offset to store,
* all it does is set rktp->rktp_stored_offset to this value.
* This can happen from any thread and is locked by the rktp lock.
* - The actual commit/write of the offset to its backing store (filesystem)
* is performed by the main rdkafka thread and scheduled at the configured
* auto.commit.interval.ms interval.
* - The write is performed in the main rdkafka thread (in a blocking manner
* for file based offsets) and once the write has
* succeeded rktp->rktp_committed_offset is updated to the new value.
* - If offset.store.sync.interval.ms is configured the main rdkafka thread
* will also make sure to fsync() each offset file accordingly. (file)
*/
#include "rdkafka_int.h"
#include "rdkafka_topic.h"
#include "rdkafka_partition.h"
#include "rdkafka_offset.h"
#include "rdkafka_broker.h"
#include <stdio.h>
#include <sys/types.h>
#include <fcntl.h>
#ifdef _MSC_VER
#include <io.h>
#include <share.h>
#include <sys/stat.h>
#include <Shlwapi.h>
typedef int mode_t;
#endif
/**
* Convert an absolute or logical offset to string.
*/
const char *rd_kafka_offset2str (int64_t offset) {
static RD_TLS char ret[16][32];
static RD_TLS int i = 0;
i = (i + 1) % 16;
if (offset >= 0)
rd_snprintf(ret[i], sizeof(ret[i]), "%"PRId64, offset);
else if (offset == RD_KAFKA_OFFSET_BEGINNING)
return "BEGINNING";
else if (offset == RD_KAFKA_OFFSET_END)
return "END";
else if (offset == RD_KAFKA_OFFSET_STORED)
return "STORED";
else if (offset == RD_KAFKA_OFFSET_INVALID)
return "INVALID";
else if (offset <= RD_KAFKA_OFFSET_TAIL_BASE)
rd_snprintf(ret[i], sizeof(ret[i]), "TAIL(%lld)",
llabs(offset - RD_KAFKA_OFFSET_TAIL_BASE));
else
rd_snprintf(ret[i], sizeof(ret[i]), "%"PRId64"?", offset);
return ret[i];
}
static void rd_kafka_offset_file_close (rd_kafka_toppar_t *rktp) {
if (!rktp->rktp_offset_fp)
return;
fclose(rktp->rktp_offset_fp);
rktp->rktp_offset_fp = NULL;
}
#ifndef _MSC_VER
/**
* Linux version of open callback providing racefree CLOEXEC.
*/
int rd_kafka_open_cb_linux (const char *pathname, int flags, mode_t mode,
void *opaque) {
#ifdef O_CLOEXEC
return open(pathname, flags|O_CLOEXEC, mode);
#else
return rd_kafka_open_cb_generic(pathname, flags, mode, opaque);
#endif
}
#endif
/**
* Fallback version of open_cb NOT providing racefree CLOEXEC,
* but setting CLOEXEC after file open (if FD_CLOEXEC is defined).
*/
int rd_kafka_open_cb_generic (const char *pathname, int flags, mode_t mode,
void *opaque) {
#ifndef _MSC_VER
int fd;
int on = 1;
fd = open(pathname, flags, mode);
if (fd == -1)
return -1;
#ifdef FD_CLOEXEC
fcntl(fd, F_SETFD, FD_CLOEXEC, &on);
#endif
return fd;
#else
int fd;
if (_sopen_s(&fd, pathname, flags, _SH_DENYNO, mode) != 0)
return -1;
return fd;
#endif
}
static int rd_kafka_offset_file_open (rd_kafka_toppar_t *rktp) {
rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk;
int fd;
#ifndef _MSC_VER
mode_t mode = 0644;
#else
mode_t mode = _S_IREAD|_S_IWRITE;
#endif
if ((fd = rk->rk_conf.open_cb(rktp->rktp_offset_path,
O_CREAT|O_RDWR, mode,
rk->rk_conf.opaque)) == -1) {
rd_kafka_op_err(rktp->rktp_rkt->rkt_rk,
RD_KAFKA_RESP_ERR__FS,
"%s [%"PRId32"]: "
"Failed to open offset file %s: %s",
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition,
rktp->rktp_offset_path, rd_strerror(errno));
return -1;
}
rktp->rktp_offset_fp =
#ifndef _MSC_VER
fdopen(fd, "r+");
#else
_fdopen(fd, "r+");
#endif
return 0;
}
static int64_t rd_kafka_offset_file_read (rd_kafka_toppar_t *rktp) {
char buf[22];
char *end;
int64_t offset;
size_t r;
if (fseek(rktp->rktp_offset_fp, 0, SEEK_SET) == -1) {
rd_kafka_op_err(rktp->rktp_rkt->rkt_rk,
RD_KAFKA_RESP_ERR__FS,
"%s [%"PRId32"]: "
"Seek (for read) failed on offset file %s: %s",
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition,
rktp->rktp_offset_path,
rd_strerror(errno));
rd_kafka_offset_file_close(rktp);
return RD_KAFKA_OFFSET_INVALID;
}
r = fread(buf, 1, sizeof(buf) - 1, rktp->rktp_offset_fp);
if (r == 0) {
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
"%s [%"PRId32"]: offset file (%s) is empty",
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition,
rktp->rktp_offset_path);
return RD_KAFKA_OFFSET_INVALID;
}
buf[r] = '\0';
offset = strtoull(buf, &end, 10);
if (buf == end) {
rd_kafka_op_err(rktp->rktp_rkt->rkt_rk,
RD_KAFKA_RESP_ERR__FS,
"%s [%"PRId32"]: "
"Unable to parse offset in %s",
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition,
rktp->rktp_offset_path);
return RD_KAFKA_OFFSET_INVALID;
}
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
"%s [%"PRId32"]: Read offset %"PRId64" from offset "
"file (%s)",
rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
offset, rktp->rktp_offset_path);
return offset;
}
/**
* Sync/flush offset file.
*/
static int rd_kafka_offset_file_sync (rd_kafka_toppar_t *rktp) {
if (!rktp->rktp_offset_fp)
return 0;
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "SYNC",
"%s [%"PRId32"]: offset file sync",
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition);
#ifndef _MSC_VER
(void)fflush(rktp->rktp_offset_fp);
(void)fsync(fileno(rktp->rktp_offset_fp)); // FIXME
#else
// FIXME
// FlushFileBuffers(_get_osfhandle(fileno(rktp->rktp_offset_fp)));
#endif
return 0;
}
/**
* Write offset to offset file.
*
* Locality: toppar's broker thread
*/
static rd_kafka_resp_err_t
rd_kafka_offset_file_commit (rd_kafka_toppar_t *rktp) {
rd_kafka_itopic_t *rkt = rktp->rktp_rkt;
int attempt;
rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
int64_t offset = rktp->rktp_stored_offset;
for (attempt = 0 ; attempt < 2 ; attempt++) {
char buf[22];
int len;
if (!rktp->rktp_offset_fp)
if (rd_kafka_offset_file_open(rktp) == -1)
continue;
if (fseek(rktp->rktp_offset_fp, 0, SEEK_SET) == -1) {
rd_kafka_op_err(rktp->rktp_rkt->rkt_rk,
RD_KAFKA_RESP_ERR__FS,
"%s [%"PRId32"]: "
"Seek failed on offset file %s: %s",
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition,
rktp->rktp_offset_path,
rd_strerror(errno));
err = RD_KAFKA_RESP_ERR__FS;
rd_kafka_offset_file_close(rktp);
continue;
}
len = rd_snprintf(buf, sizeof(buf), "%"PRId64"\n", offset);
if (fwrite(buf, 1, len, rktp->rktp_offset_fp) < 1) {
rd_kafka_op_err(rktp->rktp_rkt->rkt_rk,
RD_KAFKA_RESP_ERR__FS,
"%s [%"PRId32"]: "
"Failed to write offset %"PRId64" to "
"offset file %s: %s",
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition,
offset,
rktp->rktp_offset_path,
rd_strerror(errno));
err = RD_KAFKA_RESP_ERR__FS;
rd_kafka_offset_file_close(rktp);
continue;
}
/* Need to flush before truncate to preserve write ordering */
(void)fflush(rktp->rktp_offset_fp);
/* Truncate file */
#ifdef _MSC_VER
if (_chsize_s(_fileno(rktp->rktp_offset_fp), len) == -1)
; /* Ignore truncate failures */
#else
if (ftruncate(fileno(rktp->rktp_offset_fp), len) == -1)
; /* Ignore truncate failures */
#endif
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
"%s [%"PRId32"]: wrote offset %"PRId64" to "
"file %s",
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition, offset,
rktp->rktp_offset_path);
rktp->rktp_committed_offset = offset;
/* If sync interval is set to immediate we sync right away. */
if (rkt->rkt_conf.offset_store_sync_interval_ms == 0)
rd_kafka_offset_file_sync(rktp);
return RD_KAFKA_RESP_ERR_NO_ERROR;
}
return err;
}
/**
* Enqueue offset_commit_cb op, if configured.
*
*/
void rd_kafka_offset_commit_cb_op (rd_kafka_t *rk,
rd_kafka_resp_err_t err,
const rd_kafka_topic_partition_list_t *offsets) {
rd_kafka_op_t *rko;
if (!(rk->rk_conf.enabled_events & RD_KAFKA_EVENT_OFFSET_COMMIT))
return;
rko = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_COMMIT|RD_KAFKA_OP_REPLY);
rd_kafka_op_set_prio(rko, RD_KAFKA_PRIO_HIGH);
rko->rko_err = err;
rko->rko_u.offset_commit.cb = rk->rk_conf.offset_commit_cb;/*maybe NULL*/
rko->rko_u.offset_commit.opaque = rk->rk_conf.opaque;
if (offsets)
rko->rko_u.offset_commit.partitions =
rd_kafka_topic_partition_list_copy(offsets);
rd_kafka_q_enq(rk->rk_rep, rko);
}
/**
* Commit a list of offsets asynchronously. Response will be queued on 'replyq'.
* Optional \p cb will be set on requesting op.
*
* Makes a copy of \p offsets (may be NULL for current assignment)
*/
static rd_kafka_resp_err_t
rd_kafka_commit0 (rd_kafka_t *rk,
const rd_kafka_topic_partition_list_t *offsets,
rd_kafka_toppar_t *rktp,
rd_kafka_replyq_t replyq,
void (*cb) (rd_kafka_t *rk,
rd_kafka_resp_err_t err,
rd_kafka_topic_partition_list_t *offsets,
void *opaque),
void *opaque,
const char *reason) {
rd_kafka_cgrp_t *rkcg;
rd_kafka_op_t *rko;
if (!(rkcg = rd_kafka_cgrp_get(rk)))
return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
rko = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_COMMIT);
rko->rko_u.offset_commit.reason = rd_strdup(reason);
rko->rko_replyq = replyq;
rko->rko_u.offset_commit.cb = cb;
rko->rko_u.offset_commit.opaque = opaque;
if (rktp)
rko->rko_rktp = rd_kafka_toppar_keep(rktp);
if (offsets)
rko->rko_u.offset_commit.partitions =
rd_kafka_topic_partition_list_copy(offsets);
rd_kafka_q_enq(rkcg->rkcg_ops, rko);
return RD_KAFKA_RESP_ERR_NO_ERROR;
}
/**
* NOTE: 'offsets' may be NULL, see official documentation.
*/
rd_kafka_resp_err_t
rd_kafka_commit (rd_kafka_t *rk,
const rd_kafka_topic_partition_list_t *offsets, int async) {
rd_kafka_cgrp_t *rkcg;
rd_kafka_resp_err_t err;
rd_kafka_q_t *repq = NULL;
rd_kafka_replyq_t rq = RD_KAFKA_NO_REPLYQ;
if (!(rkcg = rd_kafka_cgrp_get(rk)))
return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
if (!async)
repq = rd_kafka_q_new(rk);
if (!async)
rq = RD_KAFKA_REPLYQ(repq, 0);
err = rd_kafka_commit0(rk, offsets, NULL, rq, NULL, NULL, "manual");
if (!err && !async) {
err = rd_kafka_q_wait_result(repq, RD_POLL_INFINITE);
rd_kafka_q_destroy(repq);
}
return err;
}
rd_kafka_resp_err_t
rd_kafka_commit_message (rd_kafka_t *rk, const rd_kafka_message_t *rkmessage,
int async) {
rd_kafka_topic_partition_list_t *offsets;
rd_kafka_topic_partition_t *rktpar;
rd_kafka_resp_err_t err;
if (rkmessage->err)
return RD_KAFKA_RESP_ERR__INVALID_ARG;
offsets = rd_kafka_topic_partition_list_new(1);
rktpar = rd_kafka_topic_partition_list_add(
offsets, rd_kafka_topic_name(rkmessage->rkt),
rkmessage->partition);
rktpar->offset = rkmessage->offset+1;
err = rd_kafka_commit(rk, offsets, async);
rd_kafka_topic_partition_list_destroy(offsets);
return err;
}
rd_kafka_resp_err_t
rd_kafka_commit_queue (rd_kafka_t *rk,
const rd_kafka_topic_partition_list_t *offsets,
rd_kafka_queue_t *rkqu,
void (*cb) (rd_kafka_t *rk,
rd_kafka_resp_err_t err,
rd_kafka_topic_partition_list_t *offsets,
void *opaque),
void *opaque) {
rd_kafka_q_t *rkq;
rd_kafka_resp_err_t err;
if (!rd_kafka_cgrp_get(rk))
return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
if (rkqu)
rkq = rkqu->rkqu_q;
else
rkq = rd_kafka_q_new(rk);
err = rd_kafka_commit0(rk, offsets, NULL,
RD_KAFKA_REPLYQ(rkq, 0),
cb, opaque, "manual");
if (!rkqu) {
rd_kafka_op_t *rko =
rd_kafka_q_pop_serve(rkq, RD_POLL_INFINITE,
0, RD_KAFKA_Q_CB_FORCE_RETURN,
NULL, NULL);
if (!rko)
err = RD_KAFKA_RESP_ERR__TIMED_OUT;
else {
if (cb)
cb(rk, rko->rko_err,
rko->rko_u.offset_commit.partitions,
opaque);
err = rko->rko_err;
rd_kafka_op_destroy(rko);
}
rd_kafka_q_destroy(rkq);
}
return err;
}
/**
* Called when a broker commit is done.
*
* Locality: toppar handler thread
* Locks: none
*/
static void
rd_kafka_offset_broker_commit_cb (rd_kafka_t *rk,
rd_kafka_resp_err_t err,
rd_kafka_topic_partition_list_t *offsets,
void *opaque) {
shptr_rd_kafka_toppar_t *s_rktp;
rd_kafka_toppar_t *rktp;
rd_kafka_topic_partition_t *rktpar;
if (offsets->cnt == 0) {
rd_kafka_dbg(rk, TOPIC, "OFFSETCOMMIT",
"No offsets to commit (commit_cb)");
return;
}
rktpar = &offsets->elems[0];
if (!(s_rktp = rd_kafka_topic_partition_list_get_toppar(rk, rktpar))) {
rd_kafka_dbg(rk, TOPIC, "OFFSETCOMMIT",
"No local partition found for %s [%"PRId32"] "
"while parsing OffsetCommit response "
"(offset %"PRId64", error \"%s\")",
rktpar->topic,
rktpar->partition,
rktpar->offset,
rd_kafka_err2str(rktpar->err));
return;
}
rktp = rd_kafka_toppar_s2i(s_rktp);
if (!err)
err = rktpar->err;
rd_kafka_toppar_offset_commit_result(rktp, err, offsets);
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
"%s [%"PRId32"]: offset %"PRId64" committed: %s",
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition, rktpar->offset,
rd_kafka_err2str(err));
rktp->rktp_committing_offset = 0;
rd_kafka_toppar_lock(rktp);
if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_OFFSET_STORE_STOPPING)
rd_kafka_offset_store_term(rktp, err);
rd_kafka_toppar_unlock(rktp);
rd_kafka_toppar_destroy(s_rktp);
}
static rd_kafka_resp_err_t
rd_kafka_offset_broker_commit (rd_kafka_toppar_t *rktp, const char *reason) {
rd_kafka_topic_partition_list_t *offsets;
rd_kafka_topic_partition_t *rktpar;
rd_kafka_assert(rktp->rktp_rkt->rkt_rk, rktp->rktp_cgrp != NULL);
rd_kafka_assert(rktp->rktp_rkt->rkt_rk,
rktp->rktp_flags & RD_KAFKA_TOPPAR_F_OFFSET_STORE);
rktp->rktp_committing_offset = rktp->rktp_stored_offset;
offsets = rd_kafka_topic_partition_list_new(1);
rktpar = rd_kafka_topic_partition_list_add(
offsets, rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition);
rktpar->offset = rktp->rktp_committing_offset;
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSETCMT",
"%.*s [%"PRId32"]: committing offset %"PRId64": %s",
RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
rktp->rktp_partition, rktp->rktp_committing_offset,
reason);
rd_kafka_commit0(rktp->rktp_rkt->rkt_rk, offsets, rktp,
RD_KAFKA_REPLYQ(rktp->rktp_ops, 0),
rd_kafka_offset_broker_commit_cb, NULL,
reason);
rd_kafka_topic_partition_list_destroy(offsets);
return RD_KAFKA_RESP_ERR__IN_PROGRESS;
}
/**
* Commit offset to backing store.
* This might be an async operation.
*
* Locality: toppar handler thread
*/
static
rd_kafka_resp_err_t rd_kafka_offset_commit (rd_kafka_toppar_t *rktp,
const char *reason) {
if (1) // FIXME
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
"%s [%"PRId32"]: commit: "
"stored offset %"PRId64" > committed offset %"PRId64"?",
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition,
rktp->rktp_stored_offset, rktp->rktp_committed_offset);
/* Already committed */
if (rktp->rktp_stored_offset <= rktp->rktp_committed_offset)
return RD_KAFKA_RESP_ERR_NO_ERROR;
/* Already committing (for async ops) */
if (rktp->rktp_stored_offset <= rktp->rktp_committing_offset)
return RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS;
switch (rktp->rktp_rkt->rkt_conf.offset_store_method)
{
case RD_KAFKA_OFFSET_METHOD_FILE:
return rd_kafka_offset_file_commit(rktp);
case RD_KAFKA_OFFSET_METHOD_BROKER:
return rd_kafka_offset_broker_commit(rktp, reason);
default:
/* UNREACHABLE */
return RD_KAFKA_RESP_ERR__INVALID_ARG;
}
}
/**
* Sync offset backing store. This is only used for METHOD_FILE.
*
* Locality: rktp's broker thread.
*/
rd_kafka_resp_err_t rd_kafka_offset_sync (rd_kafka_toppar_t *rktp) {
switch (rktp->rktp_rkt->rkt_conf.offset_store_method)
{
case RD_KAFKA_OFFSET_METHOD_FILE:
return rd_kafka_offset_file_sync(rktp);
default:
return RD_KAFKA_RESP_ERR__INVALID_ARG;
}
}
/**
* Store offset.
* Typically called from application code.
*
* NOTE: No locks must be held.
*/
rd_kafka_resp_err_t rd_kafka_offset_store (rd_kafka_topic_t *app_rkt,
int32_t partition, int64_t offset) {
rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
shptr_rd_kafka_toppar_t *s_rktp;
/* Find toppar */
rd_kafka_topic_rdlock(rkt);
if (!(s_rktp = rd_kafka_toppar_get(rkt, partition, 0/*!ua_on_miss*/))) {
rd_kafka_topic_rdunlock(rkt);
return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
}
rd_kafka_topic_rdunlock(rkt);
rd_kafka_offset_store0(rd_kafka_toppar_s2i(s_rktp), offset+1,
1/*lock*/);
rd_kafka_toppar_destroy(s_rktp);
return RD_KAFKA_RESP_ERR_NO_ERROR;
}
rd_kafka_resp_err_t
rd_kafka_offsets_store (rd_kafka_t *rk,
rd_kafka_topic_partition_list_t *offsets) {
int i;
int ok_cnt = 0;
for (i = 0 ; i < offsets->cnt ; i++) {
rd_kafka_topic_partition_t *rktpar = &offsets->elems[i];
shptr_rd_kafka_toppar_t *s_rktp;
s_rktp = rd_kafka_topic_partition_get_toppar(rk, rktpar);
if (!s_rktp) {
rktpar->err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
continue;
}
rd_kafka_offset_store0(rd_kafka_toppar_s2i(s_rktp),
rktpar->offset, 1/*lock*/);
rd_kafka_toppar_destroy(s_rktp);
rktpar->err = RD_KAFKA_RESP_ERR_NO_ERROR;
ok_cnt++;
}
return offsets->cnt > 0 && ok_cnt < offsets->cnt ?
RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION :
RD_KAFKA_RESP_ERR_NO_ERROR;
}
/**
* Decommissions the use of an offset file for a toppar.
* The file content will not be touched and the file will not be removed.
*/
static rd_kafka_resp_err_t rd_kafka_offset_file_term (rd_kafka_toppar_t *rktp) {
rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
/* Sync offset file if the sync is intervalled (> 0) */
if (rktp->rktp_rkt->rkt_conf.offset_store_sync_interval_ms > 0) {
rd_kafka_offset_file_sync(rktp);
rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers,
&rktp->rktp_offset_sync_tmr, 1/*lock*/);
}
rd_kafka_offset_file_close(rktp);
rd_free(rktp->rktp_offset_path);
rktp->rktp_offset_path = NULL;
return err;
}
static rd_kafka_op_res_t
rd_kafka_offset_reset_op_cb (rd_kafka_t *rk, rd_kafka_q_t *rkq,
rd_kafka_op_t *rko) {
rd_kafka_toppar_t *rktp =
rd_kafka_toppar_s2i(rko->rko_rktp);
rd_kafka_toppar_lock(rktp);
rd_kafka_offset_reset(rktp,
rko->rko_u.offset_reset.offset,
rko->rko_err, rko->rko_u.offset_reset.reason);
rd_kafka_toppar_unlock(rktp);
return RD_KAFKA_OP_RES_HANDLED;
}
/**
* Take action when the offset for a toppar becomes unusable.
*
* Locality: toppar handler thread
* Locks: toppar_lock() MUST be held
*/
void rd_kafka_offset_reset (rd_kafka_toppar_t *rktp, int64_t err_offset,
rd_kafka_resp_err_t err, const char *reason) {
int64_t offset = RD_KAFKA_OFFSET_INVALID;
rd_kafka_op_t *rko;
/* Enqueue op for toppar handler thread if we're on the wrong thread. */
if (!thrd_is_current(rktp->rktp_rkt->rkt_rk->rk_thread)) {
rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_RESET |
RD_KAFKA_OP_CB);
rko->rko_op_cb = rd_kafka_offset_reset_op_cb;
rko->rko_err = err;
rko->rko_rktp = rd_kafka_toppar_keep(rktp);
rko->rko_u.offset_reset.offset = err_offset;
rko->rko_u.offset_reset.reason = rd_strdup(reason);
rd_kafka_q_enq(rktp->rktp_ops, rko);
return;
}
if (err_offset == RD_KAFKA_OFFSET_INVALID || err)
offset = rktp->rktp_rkt->rkt_conf.auto_offset_reset;
else
offset = err_offset;
if (offset == RD_KAFKA_OFFSET_INVALID) {
/* Error, auto.offset.reset tells us to error out. */
rko = rd_kafka_op_new(RD_KAFKA_OP_CONSUMER_ERR);
rko->rko_err = err;
rko->rko_u.err.offset = err_offset;
rko->rko_u.err.errstr = rd_strdup(reason);
rko->rko_rktp = rd_kafka_toppar_keep(rktp);
rd_kafka_q_enq(rktp->rktp_fetchq, rko);
rd_kafka_toppar_set_fetch_state(
rktp, RD_KAFKA_TOPPAR_FETCH_NONE);
} else {
/* Query logical offset */
rktp->rktp_query_offset = offset;
rd_kafka_toppar_set_fetch_state(
rktp, RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY);
}
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
"%s [%"PRId32"]: offset reset (at offset %s) "
"to %s: %s: %s",
rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
rd_kafka_offset2str(err_offset),
rd_kafka_offset2str(offset),
reason, rd_kafka_err2str(err));
if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY)
rd_kafka_toppar_offset_request(rktp, rktp->rktp_query_offset, 0);
}
/**
* Escape any special characters in filename 'in' and write escaped
* string to 'out' (of max size out_size).
*/
static char *mk_esc_filename (const char *in, char *out, size_t out_size) {
const char *s = in;
char *o = out;
while (*s) {
const char *esc;
size_t esclen;
switch (*s)
{
case '/': /* linux */
esc = "%2F";
esclen = strlen(esc);
break;
case ':': /* osx, windows */
esc = "%3A";
esclen = strlen(esc);
break;
case '\\': /* windows */
esc = "%5C";
esclen = strlen(esc);
break;
default:
esc = s;
esclen = 1;
break;
}
if ((size_t)((o + esclen + 1) - out) >= out_size) {
/* No more space in output string, truncate. */
break;
}
while (esclen-- > 0)
*(o++) = *(esc++);
s++;
}
*o = '\0';
return out;
}
static void rd_kafka_offset_sync_tmr_cb (rd_kafka_timers_t *rkts, void *arg) {
rd_kafka_toppar_t *rktp = arg;
rd_kafka_offset_sync(rktp);
}
/**
* Prepare a toppar for using an offset file.
*
* Locality: rdkafka main thread
* Locks: toppar_lock(rktp) must be held
*/
static void rd_kafka_offset_file_init (rd_kafka_toppar_t *rktp) {
char spath[4096];
const char *path = rktp->rktp_rkt->rkt_conf.offset_store_path;
int64_t offset = RD_KAFKA_OFFSET_INVALID;
if (rd_kafka_path_is_dir(path)) {
char tmpfile[1024];
char escfile[4096];
/* Include group.id in filename if configured. */
if (!RD_KAFKAP_STR_IS_NULL(rktp->rktp_rkt->rkt_rk->rk_group_id))
rd_snprintf(tmpfile, sizeof(tmpfile),
"%s-%"PRId32"-%.*s.offset",
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition,
RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_rk->
rk_group_id));
else
rd_snprintf(tmpfile, sizeof(tmpfile),
"%s-%"PRId32".offset",
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition);
/* Escape filename to make it safe. */
mk_esc_filename(tmpfile, escfile, sizeof(escfile));
rd_snprintf(spath, sizeof(spath), "%s%s%s",
path, path[strlen(path)-1] == '/' ? "" : "/", escfile);
path = spath;
}
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
"%s [%"PRId32"]: using offset file %s",
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition,
path);
rktp->rktp_offset_path = rd_strdup(path);
/* Set up the offset file sync interval. */
if (rktp->rktp_rkt->rkt_conf.offset_store_sync_interval_ms > 0)
rd_kafka_timer_start(&rktp->rktp_rkt->rkt_rk->rk_timers,
&rktp->rktp_offset_sync_tmr,
rktp->rktp_rkt->rkt_conf.
offset_store_sync_interval_ms * 1000ll,
rd_kafka_offset_sync_tmr_cb, rktp);
if (rd_kafka_offset_file_open(rktp) != -1) {
/* Read offset from offset file. */
offset = rd_kafka_offset_file_read(rktp);
}
if (offset != RD_KAFKA_OFFSET_INVALID) {
/* Start fetching from offset */
rktp->rktp_stored_offset = offset;
rktp->rktp_committed_offset = offset;
rd_kafka_toppar_next_offset_handle(rktp, offset);
} else {
/* Offset was not usable: perform offset reset logic */
rktp->rktp_committed_offset = RD_KAFKA_OFFSET_INVALID;
rd_kafka_offset_reset(rktp, RD_KAFKA_OFFSET_INVALID,
RD_KAFKA_RESP_ERR__FS,
"non-readable offset file");
}
}
/**
* Terminate broker offset store
*/
static rd_kafka_resp_err_t rd_kafka_offset_broker_term (rd_kafka_toppar_t *rktp){
return RD_KAFKA_RESP_ERR_NO_ERROR;
}
/**
* Prepare a toppar for using broker offset commit (broker 0.8.2 or later).
* When using KafkaConsumer (high-level consumer) this functionality is
* disabled in favour of the cgrp commits for the entire set of subscriptions.
*/
static void rd_kafka_offset_broker_init (rd_kafka_toppar_t *rktp) {
if (!rd_kafka_is_simple_consumer(rktp->rktp_rkt->rkt_rk))
return;
rd_kafka_offset_reset(rktp, RD_KAFKA_OFFSET_STORED, 0,
"query broker for offsets");
}
/**
* Terminates toppar's offset store, this is the finalizing step after
* offset_store_stop().
*
* Locks: rd_kafka_toppar_lock() MUST be held.
*/
void rd_kafka_offset_store_term (rd_kafka_toppar_t *rktp,
rd_kafka_resp_err_t err) {
rd_kafka_resp_err_t err2;
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "STORETERM",
"%s [%"PRId32"]: offset store terminating",
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition);
rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_OFFSET_STORE_STOPPING;
rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers,
&rktp->rktp_offset_commit_tmr, 1/*lock*/);
switch (rktp->rktp_rkt->rkt_conf.offset_store_method)
{
case RD_KAFKA_OFFSET_METHOD_FILE:
err2 = rd_kafka_offset_file_term(rktp);
break;
case RD_KAFKA_OFFSET_METHOD_BROKER:
err2 = rd_kafka_offset_broker_term(rktp);
break;
case RD_KAFKA_OFFSET_METHOD_NONE:
err2 = RD_KAFKA_RESP_ERR_NO_ERROR;
break;
}
/* Prioritize the input error (probably from commit), fall
* back on termination error. */
if (!err)
err = err2;
rd_kafka_toppar_fetch_stopped(rktp, err);
}
/**
* Stop toppar's offset store, committing the final offsets, etc.
*
* Returns RD_KAFKA_RESP_ERR_NO_ERROR on success,
* RD_KAFKA_RESP_ERR__IN_PROGRESS if the term triggered an
* async operation (e.g., broker offset commit), or
* any other error in case of immediate failure.
*
* The offset layer will call rd_kafka_offset_store_term() when
* the offset management has been fully stopped for this partition.
*
* Locks: rd_kafka_toppar_lock() MUST be held.
*/
rd_kafka_resp_err_t rd_kafka_offset_store_stop (rd_kafka_toppar_t *rktp) {
rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
if (!(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_OFFSET_STORE))
goto done;
rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_OFFSET_STORE_STOPPING;
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
"%s [%"PRId32"]: stopping offset store "
"(stored offset %"PRId64
", committed offset %"PRId64", EOF offset %"PRId64")",
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition,
rktp->rktp_stored_offset, rktp->rktp_committed_offset,
rktp->rktp_offsets_fin.eof_offset);
/* Store end offset for empty partitions */
if (rktp->rktp_rkt->rkt_rk->rk_conf.enable_auto_offset_store &&
rktp->rktp_stored_offset == RD_KAFKA_OFFSET_INVALID &&
rktp->rktp_offsets_fin.eof_offset > 0)
rd_kafka_offset_store0(rktp, rktp->rktp_offsets_fin.eof_offset,
0/*no lock*/);
/* Commit offset to backing store.
* This might be an async operation. */
if (rd_kafka_is_simple_consumer(rktp->rktp_rkt->rkt_rk) &&
rktp->rktp_stored_offset > rktp->rktp_committed_offset)
err = rd_kafka_offset_commit(rktp, "offset store stop");
/* If stop is in progress (async commit), return now. */
if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS)
return err;
done:
/* Stop is done */
rd_kafka_offset_store_term(rktp, err);
return RD_KAFKA_RESP_ERR_NO_ERROR;
}
static void rd_kafka_offset_auto_commit_tmr_cb (rd_kafka_timers_t *rkts,
void *arg) {
rd_kafka_toppar_t *rktp = arg;
rd_kafka_offset_commit(rktp, "auto commit timer");
}
void rd_kafka_offset_query_tmr_cb (rd_kafka_timers_t *rkts, void *arg) {
rd_kafka_toppar_t *rktp = arg;
rd_kafka_toppar_lock(rktp);
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
"Topic %s [%"PRId32"]: timed offset query for %s in "
"state %s",
rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
rd_kafka_offset2str(rktp->rktp_query_offset),
rd_kafka_fetch_states[rktp->rktp_fetch_state]);
rd_kafka_toppar_offset_request(rktp, rktp->rktp_query_offset, 0);
rd_kafka_toppar_unlock(rktp);
}
/**
* Initialize toppar's offset store.
*
* Locality: toppar handler thread
*/
void rd_kafka_offset_store_init (rd_kafka_toppar_t *rktp) {
static const char *store_names[] = { "none", "file", "broker" };
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
"%s [%"PRId32"]: using offset store method: %s",
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition,
store_names[rktp->rktp_rkt->rkt_conf.offset_store_method]);
/* The committed offset is unknown at this point. */
rktp->rktp_committed_offset = RD_KAFKA_OFFSET_INVALID;
/* Set up the commit interval (for simple consumer). */
if (rd_kafka_is_simple_consumer(rktp->rktp_rkt->rkt_rk) &&
rktp->rktp_rkt->rkt_conf.auto_commit_interval_ms > 0)
rd_kafka_timer_start(&rktp->rktp_rkt->rkt_rk->rk_timers,
&rktp->rktp_offset_commit_tmr,
rktp->rktp_rkt->rkt_conf.
auto_commit_interval_ms * 1000ll,
rd_kafka_offset_auto_commit_tmr_cb,
rktp);
switch (rktp->rktp_rkt->rkt_conf.offset_store_method)
{
case RD_KAFKA_OFFSET_METHOD_FILE:
rd_kafka_offset_file_init(rktp);
break;
case RD_KAFKA_OFFSET_METHOD_BROKER:
rd_kafka_offset_broker_init(rktp);
break;
case RD_KAFKA_OFFSET_METHOD_NONE:
break;
default:
/* NOTREACHED */
return;
}
rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_OFFSET_STORE;
}