blob: 516649f0f35b8e81e742e37d258a49eb00869274 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "load/routine_load/data_consumer.h"
#include <absl/strings/str_split.h>
#include <gen_cpp/Types_types.h>
#include <gen_cpp/internal_service.pb.h>
#include <librdkafka/rdkafkacpp.h>
// AWS Kinesis SDK includes
#include <aws/core/client/ClientConfiguration.h>
#include <aws/core/utils/Outcome.h>
#include <aws/kinesis/KinesisClient.h>
#include <aws/kinesis/model/GetRecordsRequest.h>
#include <aws/kinesis/model/GetRecordsResult.h>
#include <aws/kinesis/model/GetShardIteratorRequest.h>
#include <aws/kinesis/model/GetShardIteratorResult.h>
#include <aws/kinesis/model/ListShardsRequest.h>
#include <aws/kinesis/model/ListShardsResult.h>
#include <aws/kinesis/model/Record.h>
#include <aws/kinesis/model/ShardIteratorType.h>
#include <algorithm>
// IWYU pragma: no_include <bits/chrono.h>
#include <chrono> // IWYU pragma: keep
#include <string>
#include <thread>
#include <utility>
#include <vector>
#include "common/config.h"
#include "common/metrics/doris_metrics.h"
#include "common/status.h"
#include "load/routine_load/consumer_helpers.h"
#include "load/routine_load/kinesis_conf.h"
#include "runtime/aws_msk_iam_auth.h"
#include "runtime/exec_env.h"
#include "runtime/small_file_mgr.h"
#include "service/backend_options.h"
#include "util/blocking_queue.hpp"
#include "util/debug_points.h"
#include "util/defer_op.h"
#include "util/s3_util.h"
#include "util/stopwatch.hpp"
#include "util/string_util.h"
#include "util/uid_util.h"
namespace doris {
static const std::string PROP_GROUP_ID = "group.id";
// init kafka consumer will only set common configs such as
// brokers, groupid
Status KafkaDataConsumer::init(std::shared_ptr<StreamLoadContext> ctx) {
std::unique_lock<std::mutex> l(_lock);
if (_init) {
// this consumer has already been initialized.
return Status::OK();
}
RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
// conf has to be deleted finally
Defer delete_conf {[conf]() { delete conf; }};
std::string errstr;
auto set_conf = [&conf, &errstr](const std::string& conf_key, const std::string& conf_val) {
RdKafka::Conf::ConfResult res = conf->set(conf_key, conf_val, errstr);
if (res == RdKafka::Conf::CONF_UNKNOWN) {
// ignore unknown config
return Status::OK();
} else if (errstr.find("not supported") != std::string::npos) {
// some java-only properties may be passed to here, and librdkafak will return 'xxx' not supported
// ignore it
return Status::OK();
} else if (res != RdKafka::Conf::CONF_OK) {
std::stringstream ss;
ss << "PAUSE: failed to set '" << conf_key << "', value: '" << conf_val
<< "', err: " << errstr;
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}
VLOG_NOTICE << "set " << conf_key << ": " << conf_val;
return Status::OK();
};
RETURN_IF_ERROR(set_conf("metadata.broker.list", ctx->kafka_info->brokers));
RETURN_IF_ERROR(set_conf("enable.partition.eof", "true"));
RETURN_IF_ERROR(set_conf("enable.auto.offset.store", "false"));
// TODO: set it larger than 0 after we set rd_kafka_conf_set_stats_cb()
RETURN_IF_ERROR(set_conf("statistics.interval.ms", "0"));
RETURN_IF_ERROR(set_conf("auto.offset.reset", "error"));
RETURN_IF_ERROR(set_conf("socket.keepalive.enable", "true"));
RETURN_IF_ERROR(set_conf("reconnect.backoff.ms", "100"));
RETURN_IF_ERROR(set_conf("reconnect.backoff.max.ms", "10000"));
RETURN_IF_ERROR(set_conf("api.version.request", config::kafka_api_version_request));
RETURN_IF_ERROR(set_conf("api.version.fallback.ms", "0"));
RETURN_IF_ERROR(set_conf("broker.version.fallback", config::kafka_broker_version_fallback));
RETURN_IF_ERROR(set_conf("broker.address.ttl", "0"));
if (config::kafka_debug != "disable") {
RETURN_IF_ERROR(set_conf("debug", config::kafka_debug));
}
for (auto& item : ctx->kafka_info->properties) {
_custom_properties.emplace(item.first, item.second);
// AWS properties (aws.*) are Doris-specific for MSK IAM authentication
// and should not be passed to librdkafka
if (starts_with(item.first, "aws.")) {
LOG(INFO) << "Skipping AWS property for librdkafka: " << item.first;
continue;
}
if (starts_with(item.second, "FILE:")) {
// file property should has format: FILE:file_id:md5
std::vector<std::string> parts =
absl::StrSplit(item.second, ":", absl::SkipWhitespace());
if (parts.size() != 3) {
return Status::InternalError("PAUSE: Invalid file property of kafka: " +
item.second);
}
int64_t file_id = std::stol(parts[1]);
std::string file_path;
Status st = ctx->exec_env()->small_file_mgr()->get_file(file_id, parts[2], &file_path);
if (!st.ok()) {
return Status::InternalError("PAUSE: failed to get file for config: {}, error: {}",
item.first, st.to_string());
}
RETURN_IF_ERROR(set_conf(item.first, file_path));
} else {
RETURN_IF_ERROR(set_conf(item.first, item.second));
}
}
// if not specified group id, generate a random one.
// ATTN: In the new version, we have set a group.id on the FE side for jobs that have not set a groupid,
// but in order to ensure compatibility, we still do a check here.
if (!_custom_properties.contains(PROP_GROUP_ID)) {
std::stringstream ss;
ss << BackendOptions::get_localhost() << "_";
std::string group_id = ss.str() + UniqueId::gen_uid().to_string();
RETURN_IF_ERROR(set_conf(PROP_GROUP_ID, group_id));
_custom_properties.emplace(PROP_GROUP_ID, group_id);
}
LOG(INFO) << "init kafka consumer with group id: " << _custom_properties[PROP_GROUP_ID];
if (conf->set("event_cb", &_k_event_cb, errstr) != RdKafka::Conf::CONF_OK) {
std::stringstream ss;
ss << "PAUSE: failed to set 'event_cb'";
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}
// Set up AWS MSK IAM authentication if configured
_aws_msk_oauth_callback = AwsMskIamOAuthCallback::create_from_properties(
_custom_properties, ctx->kafka_info->brokers);
if (_aws_msk_oauth_callback) {
// Enable SASL queue to support background callbacks
if (conf->enable_sasl_queue(true, errstr) != RdKafka::Conf::CONF_OK) {
LOG(WARNING) << "PAUSE: failed to enable SASL queue: " << errstr;
return Status::InternalError("PAUSE: failed to enable SASL queue: " + errstr);
}
if (conf->set("oauthbearer_token_refresh_cb", _aws_msk_oauth_callback.get(), errstr) !=
RdKafka::Conf::CONF_OK) {
LOG(WARNING) << "PAUSE: failed to set OAuth callback: " << errstr;
return Status::InternalError("PAUSE: failed to set OAuth callback: " + errstr);
}
LOG(INFO) << "AWS MSK IAM authentication enabled successfully";
}
// create consumer
_k_consumer = RdKafka::KafkaConsumer::create(conf, errstr);
if (!_k_consumer) {
LOG(WARNING) << "PAUSE: failed to create kafka consumer: " << errstr;
return Status::InternalError("PAUSE: failed to create kafka consumer: " + errstr);
}
// If AWS MSK IAM auth is enabled, inject initial token and enable background refresh
if (_aws_msk_oauth_callback) {
RETURN_IF_ERROR(_aws_msk_oauth_callback->refresh_now(_k_consumer));
std::unique_ptr<RdKafka::Error> bg_err(_k_consumer->sasl_background_callbacks_enable());
if (bg_err) {
return Status::InternalError("Failed to enable SASL background callbacks: " +
bg_err->str());
}
LOG(INFO) << "AWS MSK IAM: initial token set, background refresh enabled";
}
VLOG_NOTICE << "finished to init kafka consumer. " << ctx->brief();
_init = true;
return Status::OK();
}
Status KafkaDataConsumer::assign_topic_partitions(
const std::map<int32_t, int64_t>& begin_partition_offset, const std::string& topic,
std::shared_ptr<StreamLoadContext> ctx) {
DCHECK(_k_consumer);
// create TopicPartitions
std::stringstream ss;
std::vector<RdKafka::TopicPartition*> topic_partitions;
for (auto& entry : begin_partition_offset) {
RdKafka::TopicPartition* tp1 =
RdKafka::TopicPartition::create(topic, entry.first, entry.second);
topic_partitions.push_back(tp1);
_consuming_partition_ids.insert(entry.first);
ss << "[" << entry.first << ": " << entry.second << "] ";
}
LOG(INFO) << "consumer: " << _id << ", grp: " << _grp_id
<< " assign topic partitions: " << topic << ", " << ss.str();
// delete TopicPartition finally
Defer delete_tp {[&topic_partitions]() {
std::for_each(topic_partitions.begin(), topic_partitions.end(),
[](RdKafka::TopicPartition* tp1) { delete tp1; });
}};
// assign partition
RdKafka::ErrorCode err = _k_consumer->assign(topic_partitions);
if (err) {
LOG(WARNING) << "failed to assign topic partitions: " << ctx->brief(true)
<< ", err: " << RdKafka::err2str(err);
_k_consumer->unassign();
return Status::InternalError("failed to assign topic partitions");
}
return Status::OK();
}
Status KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue,
int64_t max_running_time_ms) {
int64_t left_time = max_running_time_ms;
LOG(INFO) << "start kafka consumer: " << _id << ", grp: " << _grp_id
<< ", max running time(ms): " << left_time;
int64_t received_rows = 0;
int64_t put_rows = 0;
RetryPolicy retry_policy(3, 200);
Status st = Status::OK();
MonotonicStopWatch consumer_watch;
MonotonicStopWatch watch;
watch.start();
while (true) {
{
std::unique_lock<std::mutex> l(_lock);
if (_cancelled) {
break;
}
}
if (left_time <= 0) {
break;
}
bool done = false;
// consume 1 message at a time
consumer_watch.start();
std::unique_ptr<RdKafka::Message> msg(_k_consumer->consume(1000 /* timeout, ms */));
consumer_watch.stop();
DorisMetrics::instance()->routine_load_get_msg_count->increment(1);
DorisMetrics::instance()->routine_load_get_msg_latency->increment(
consumer_watch.elapsed_time() / 1000 / 1000);
DBUG_EXECUTE_IF("KafkaDataConsumer.group_consume.out_of_range", {
done = true;
std::stringstream ss;
ss << "Offset out of range"
<< ", consume partition " << msg->partition() << ", consume offset "
<< msg->offset();
LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << ss.str();
st = Status::InternalError<false>(ss.str());
break;
});
switch (msg->err()) {
case RdKafka::ERR_NO_ERROR:
retry_policy.reset();
if (_consuming_partition_ids.count(msg->partition()) <= 0) {
_consuming_partition_ids.insert(msg->partition());
}
DorisMetrics::instance()->routine_load_consume_bytes->increment(msg->len());
if (msg->len() == 0) {
// ignore msg with length 0.
// put empty msg into queue will cause the load process shutting down.
break;
} else if (!queue->controlled_blocking_put(msg.get(),
config::blocking_queue_cv_wait_timeout_ms)) {
// queue is shutdown
done = true;
} else {
++put_rows;
msg.release(); // release the ownership, msg will be deleted after being processed
}
++received_rows;
DorisMetrics::instance()->routine_load_consume_rows->increment(1);
break;
case RdKafka::ERR__TIMED_OUT:
// leave the status as OK, because this may happened
// if there is no data in kafka.
LOG(INFO) << "kafka consume timeout: " << _id;
break;
case RdKafka::ERR__TRANSPORT:
LOG(INFO) << "kafka consume Disconnected: " << _id
<< ", retry times: " << retry_policy.retry_count();
if (retry_policy.should_retry()) {
retry_policy.retry_with_backoff();
break;
}
[[fallthrough]];
case RdKafka::ERR__PARTITION_EOF: {
VLOG_NOTICE << "consumer meet partition eof: " << _id
<< " partition offset: " << msg->offset();
_consuming_partition_ids.erase(msg->partition());
if (!queue->controlled_blocking_put(msg.get(),
config::blocking_queue_cv_wait_timeout_ms)) {
done = true;
} else if (_consuming_partition_ids.size() <= 0) {
LOG(INFO) << "all partitions meet eof: " << _id;
msg.release();
done = true;
} else {
msg.release();
}
break;
}
case RdKafka::ERR_OFFSET_OUT_OF_RANGE: {
done = true;
std::stringstream ss;
ss << msg->errstr() << ", consume partition " << msg->partition() << ", consume offset "
<< msg->offset();
LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << ss.str();
st = Status::InternalError<false>(ss.str());
break;
}
default:
LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << msg->errstr();
done = true;
st = Status::InternalError<false>(msg->errstr());
break;
}
left_time = max_running_time_ms - watch.elapsed_time() / 1000 / 1000;
if (done) {
break;
}
}
LOG(INFO) << "kafka consumer done: " << _id << ", grp: " << _grp_id
<< ". cancelled: " << _cancelled << ", left time(ms): " << left_time
<< ", total cost(ms): " << watch.elapsed_time() / 1000 / 1000
<< ", consume cost(ms): " << consumer_watch.elapsed_time() / 1000 / 1000
<< ", received rows: " << received_rows << ", put rows: " << put_rows;
return st;
}
Status KafkaDataConsumer::get_partition_meta(std::vector<int32_t>* partition_ids) {
// create topic conf
RdKafka::Conf* tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
Defer delete_conf {[tconf]() { delete tconf; }};
// create topic
std::string errstr;
RdKafka::Topic* topic = RdKafka::Topic::create(_k_consumer, _topic, tconf, errstr);
if (topic == nullptr) {
std::stringstream ss;
ss << "failed to create topic: " << errstr;
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}
Defer delete_topic {[topic]() { delete topic; }};
// get topic metadata
RdKafka::Metadata* metadata = nullptr;
RdKafka::ErrorCode err =
_k_consumer->metadata(false /* for this topic */, topic, &metadata, 5000);
if (err != RdKafka::ERR_NO_ERROR) {
std::stringstream ss;
ss << "failed to get partition meta: " << RdKafka::err2str(err);
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}
Defer delete_meta {[metadata]() { delete metadata; }};
// get partition ids
RdKafka::Metadata::TopicMetadataIterator it;
for (it = metadata->topics()->begin(); it != metadata->topics()->end(); ++it) {
if ((*it)->topic() != _topic) {
continue;
}
if ((*it)->err() != RdKafka::ERR_NO_ERROR) {
std::stringstream ss;
ss << "error: " << err2str((*it)->err());
if ((*it)->err() == RdKafka::ERR_LEADER_NOT_AVAILABLE) {
ss << ", try again";
}
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}
RdKafka::TopicMetadata::PartitionMetadataIterator ip;
for (ip = (*it)->partitions()->begin(); ip != (*it)->partitions()->end(); ++ip) {
partition_ids->push_back((*ip)->id());
}
}
if (partition_ids->empty()) {
return Status::InternalError("no partition in this topic");
}
return Status::OK();
}
// get offsets of each partition for times.
// The input parameter "times" holds <partition, timestamps>
// The output parameter "offsets" returns <partition, offsets>
//
// The returned offset for each partition is the earliest offset whose
// timestamp is greater than or equal to the given timestamp in the
// corresponding partition.
// See librdkafka/rdkafkacpp.h##offsetsForTimes()
Status KafkaDataConsumer::get_offsets_for_times(const std::vector<PIntegerPair>& times,
std::vector<PIntegerPair>* offsets, int timeout) {
// create topic partition
std::vector<RdKafka::TopicPartition*> topic_partitions;
for (const auto& entry : times) {
RdKafka::TopicPartition* tp1 =
RdKafka::TopicPartition::create(_topic, entry.key(), entry.val());
topic_partitions.push_back(tp1);
}
// delete TopicPartition finally
Defer delete_tp {[&topic_partitions]() {
std::for_each(topic_partitions.begin(), topic_partitions.end(),
[](RdKafka::TopicPartition* tp1) { delete tp1; });
}};
// get offsets for times
RdKafka::ErrorCode err = _k_consumer->offsetsForTimes(topic_partitions, timeout);
if (UNLIKELY(err != RdKafka::ERR_NO_ERROR)) {
std::stringstream ss;
ss << "failed to get offsets for times: " << RdKafka::err2str(err);
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}
for (const auto& topic_partition : topic_partitions) {
PIntegerPair pair;
pair.set_key(topic_partition->partition());
pair.set_val(topic_partition->offset());
offsets->push_back(std::move(pair));
}
return Status::OK();
}
// get latest offsets for given partitions
Status KafkaDataConsumer::get_latest_offsets_for_partitions(
const std::vector<int32_t>& partition_ids, std::vector<PIntegerPair>* offsets,
int timeout) {
DBUG_EXECUTE_IF("KafkaDataConsumer.get_offsets_for_partitions.timeout", {
// sleep 61s
std::this_thread::sleep_for(std::chrono::seconds(61));
});
MonotonicStopWatch watch;
watch.start();
for (int32_t partition_id : partition_ids) {
int64_t low = 0;
int64_t high = 0;
auto timeout_ms = timeout - static_cast<int>(watch.elapsed_time() / 1000 / 1000);
if (UNLIKELY(timeout_ms <= 0)) {
return Status::InternalError("get kafka latest offsets for partitions timeout");
}
RdKafka::ErrorCode err =
_k_consumer->query_watermark_offsets(_topic, partition_id, &low, &high, timeout_ms);
if (UNLIKELY(err != RdKafka::ERR_NO_ERROR)) {
std::stringstream ss;
ss << "failed to get latest offset for partition: " << partition_id
<< ", err: " << RdKafka::err2str(err);
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}
PIntegerPair pair;
pair.set_key(partition_id);
pair.set_val(high);
offsets->push_back(std::move(pair));
}
return Status::OK();
}
Status KafkaDataConsumer::get_real_offsets_for_partitions(
const std::vector<PIntegerPair>& offset_flags, std::vector<PIntegerPair>* offsets,
int timeout) {
DBUG_EXECUTE_IF("KafkaDataConsumer.get_offsets_for_partitions.timeout", {
// sleep 61s
std::this_thread::sleep_for(std::chrono::seconds(61));
});
MonotonicStopWatch watch;
watch.start();
for (const auto& entry : offset_flags) {
PIntegerPair pair;
if (UNLIKELY(entry.val() >= 0)) {
pair.set_key(entry.key());
pair.set_val(entry.val());
offsets->push_back(std::move(pair));
continue;
}
int64_t low = 0;
int64_t high = 0;
auto timeout_ms = timeout - static_cast<int>(watch.elapsed_time() / 1000 / 1000);
if (UNLIKELY(timeout_ms <= 0)) {
return Status::InternalError("get kafka real offsets for partitions timeout");
}
RdKafka::ErrorCode err =
_k_consumer->query_watermark_offsets(_topic, entry.key(), &low, &high, timeout_ms);
if (UNLIKELY(err != RdKafka::ERR_NO_ERROR)) {
std::stringstream ss;
ss << "failed to get latest offset for partition: " << entry.key()
<< ", err: " << RdKafka::err2str(err);
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}
pair.set_key(entry.key());
if (entry.val() == -1) {
// OFFSET_END_VAL = -1
pair.set_val(high);
} else if (entry.val() == -2) {
// OFFSET_BEGINNING_VAL = -2
pair.set_val(low);
}
offsets->push_back(std::move(pair));
}
return Status::OK();
}
Status KafkaDataConsumer::cancel(std::shared_ptr<StreamLoadContext> ctx) {
std::unique_lock<std::mutex> l(_lock);
if (!_init) {
return Status::InternalError("consumer is not initialized");
}
_cancelled = true;
LOG(INFO) << "kafka consumer cancelled. " << _id;
return Status::OK();
}
Status KafkaDataConsumer::reset() {
std::unique_lock<std::mutex> l(_lock);
_cancelled = false;
_k_consumer->unassign();
// reset will be called before this consumer being returned to the pool.
// so update _last_visit_time is reasonable.
_last_visit_time = time(nullptr);
return Status::OK();
}
Status KafkaDataConsumer::commit(std::vector<RdKafka::TopicPartition*>& offset) {
// Use async commit so that it will not block for a long time.
// Commit failure has no effect on Doris, subsequent tasks will continue to commit the new offset
RdKafka::ErrorCode err = _k_consumer->commitAsync(offset);
if (err != RdKafka::ERR_NO_ERROR) {
return Status::InternalError("failed to commit kafka offset : {}", RdKafka::err2str(err));
}
return Status::OK();
}
// if the kafka brokers and topic are same,
// we considered this consumer as matched, thus can be reused.
bool KafkaDataConsumer::match(std::shared_ptr<StreamLoadContext> ctx) {
if (ctx->load_src_type != TLoadSourceType::KAFKA) {
return false;
}
if (_brokers != ctx->kafka_info->brokers || _topic != ctx->kafka_info->topic) {
return false;
}
// check properties
return PropertyMatcher::properties_match(_custom_properties, ctx->kafka_info->properties);
}
// ==================== AWS Kinesis Data Consumer Implementation ====================
KinesisDataConsumer::KinesisDataConsumer(std::shared_ptr<StreamLoadContext> ctx)
: _region(ctx->kinesis_info->region),
_stream(ctx->kinesis_info->stream),
_endpoint(ctx->kinesis_info->endpoint) {
VLOG_NOTICE << "construct Kinesis consumer: stream=" << _stream << ", region=" << _region;
}
KinesisDataConsumer::~KinesisDataConsumer() {
VLOG_NOTICE << "destruct Kinesis consumer: stream=" << _stream;
// AWS SDK client managed by shared_ptr, will be automatically cleaned up
}
Status KinesisDataConsumer::init(std::shared_ptr<StreamLoadContext> ctx) {
std::unique_lock<std::mutex> l(_lock);
if (_init) {
return Status::OK(); // Already initialized (idempotent)
}
// Store custom properties (AWS credentials, etc.)
_custom_properties.insert(ctx->kinesis_info->properties.begin(),
ctx->kinesis_info->properties.end());
// Create KinesisConf and configure it
_kinesis_conf = std::make_unique<KinesisConf>();
std::string errstr;
// Parse and categorize aws.kinesis.* properties into three types
for (auto& item : _custom_properties) {
if (starts_with(item.first, "aws.kinesis.")) {
std::string conf_key = item.first.substr(12); // Remove "aws.kinesis." prefix
// Type 2: Frequently-used parameters (explicit members)
if (conf_key == "shards") {
std::vector<std::string> parts =
absl::StrSplit(item.second, ",", absl::SkipWhitespace());
_explicit_shards = std::move(parts);
VLOG_NOTICE << "Set explicit shards: " << item.second;
} else if (conf_key == "default.pos") {
_default_position = item.second;
VLOG_NOTICE << "Set default position: " << item.second;
} else if (starts_with(conf_key, "shards.pos.")) {
std::string shard_id = conf_key.substr(11); // Remove "shards.pos." prefix
_shard_positions[shard_id] = item.second;
VLOG_NOTICE << "Set shard position: " << shard_id << " = " << item.second;
}
// Type 3: Less-frequently-used API parameters (KinesisConf determines which API)
else {
KinesisConf::ConfResult res = _kinesis_conf->set(conf_key, item.second, errstr);
if (res == KinesisConf::CONF_INVALID) {
return Status::InternalError("Failed to set '{}': {}", conf_key, errstr);
}
// CONF_UNKNOWN is acceptable (parameter will be ignored)
}
}
}
// Create AWS Kinesis client
RETURN_IF_ERROR(_create_kinesis_client(ctx));
VLOG_NOTICE << "finished to init Kinesis consumer. stream=" << _stream << ", region=" << _region
<< ", " << ctx->brief();
_init = true;
return Status::OK();
}
Status KinesisDataConsumer::_create_kinesis_client(std::shared_ptr<StreamLoadContext> ctx) {
// Reuse S3ClientFactory's credential provider logic
// This supports all AWS authentication methods:
// - Simple AK/SK
// - IAM instance profile (EC2)
// - STS assume role
// - Session tokens
// - Environment variables
// - Default credential chain
S3ClientConf s3_conf;
s3_conf.region = _region;
s3_conf.endpoint = _endpoint;
auto get_property = [this](const char* key) -> std::string {
auto it = _custom_properties.find(key);
if (it != _custom_properties.end() && !it->second.empty()) {
return it->second;
}
return "";
};
// Keep one naming convention aligned with FE-side Kinesis properties.
s3_conf.ak = get_property("aws.access_key");
s3_conf.sk = get_property("aws.secret_key");
s3_conf.token = get_property("aws.session_key");
s3_conf.role_arn = get_property("aws.role_arn");
s3_conf.external_id = get_property("aws.external.id");
const std::string provider = get_property("aws.credentials.provider");
if (!provider.empty()) {
// Map provider type string to enum
if (provider == "instance_profile") {
s3_conf.cred_provider_type = CredProviderType::InstanceProfile;
} else if (provider == "env") {
s3_conf.cred_provider_type = CredProviderType::Env;
} else if (provider == "simple") {
s3_conf.cred_provider_type = CredProviderType::Simple;
}
}
// Create AWS ClientConfiguration
Aws::Client::ClientConfiguration aws_config = S3ClientFactory::getClientConfiguration();
aws_config.region = _region;
if (!_endpoint.empty()) {
aws_config.endpointOverride = _endpoint;
}
std::string ca_cert_file_path =
get_valid_ca_cert_path(doris::split(config::ca_cert_file_paths, ";"));
if (!ca_cert_file_path.empty()) {
aws_config.caFile = ca_cert_file_path;
}
auto parse_timeout_ms = [](const std::string& timeout_value, const std::string& property_name,
long* timeout_ms) -> Status {
try {
*timeout_ms = std::stol(timeout_value);
} catch (const std::exception&) {
return Status::InternalError("Invalid value for {}: {}", property_name, timeout_value);
}
return Status::OK();
};
// Set timeouts from properties or use defaults
auto it_request_timeout = _custom_properties.find("aws.request.timeout.ms");
if (it_request_timeout != _custom_properties.end()) {
RETURN_IF_ERROR(parse_timeout_ms(it_request_timeout->second, "aws.request.timeout.ms",
&aws_config.requestTimeoutMs));
} else {
aws_config.requestTimeoutMs = 30000; // 30s default
}
auto it_conn_timeout = _custom_properties.find("aws.connection.timeout.ms");
if (it_conn_timeout != _custom_properties.end()) {
RETURN_IF_ERROR(parse_timeout_ms(it_conn_timeout->second, "aws.connection.timeout.ms",
&aws_config.connectTimeoutMs));
}
// Get credentials provider (reuses S3 infrastructure)
auto credentials_provider = S3ClientFactory::instance().get_aws_credentials_provider(s3_conf);
// Create Kinesis client
_kinesis_client =
std::make_shared<Aws::Kinesis::KinesisClient>(credentials_provider, aws_config);
if (!_kinesis_client) {
return Status::InternalError(
"Failed to create AWS Kinesis client for stream: {}, region: {}", _stream, _region);
}
LOG(INFO) << "Created Kinesis client for stream: " << _stream << ", region: " << _region;
return Status::OK();
}
Status KinesisDataConsumer::assign_shards(
const std::map<std::string, std::string>& shard_sequence_numbers,
const std::string& stream_name, std::shared_ptr<StreamLoadContext> ctx) {
DORIS_CHECK(_kinesis_client);
std::stringstream ss;
ss << "Assigning shards to Kinesis consumer " << _id << ": ";
for (auto& entry : shard_sequence_numbers) {
const std::string& shard_id = entry.first;
const std::string& sequence_number = entry.second;
// Get shard iterator for this shard
std::string iterator;
RETURN_IF_ERROR(_get_shard_iterator(shard_id, sequence_number, &iterator));
_shard_iterators[shard_id] = iterator;
_consuming_shard_ids.insert(shard_id);
ss << "[" << shard_id << ": " << sequence_number << "] ";
}
LOG(INFO) << ss.str();
return Status::OK();
}
Status KinesisDataConsumer::_get_shard_iterator(const std::string& shard_id,
const std::string& sequence_number,
std::string* iterator) {
Aws::Kinesis::Model::GetShardIteratorRequest request;
// Apply all configurations through KinesisConf
DCHECK(_kinesis_conf != nullptr);
Status st = _kinesis_conf->apply_to_get_shard_iterator_request(request, _stream, shard_id,
sequence_number);
if (!st.ok()) {
return Status::InternalError(
"Failed to apply Kinesis config to GetShardIteratorRequest: {}", st.to_string());
}
auto outcome = _kinesis_client->GetShardIterator(request);
if (!outcome.IsSuccess()) {
auto& error = outcome.GetError();
return Status::InternalError("Failed to get shard iterator for shard {}: {} ({})", shard_id,
error.GetMessage(), static_cast<int>(error.GetErrorType()));
}
*iterator = outcome.GetResult().GetShardIterator();
VLOG_NOTICE << "Got shard iterator for shard: " << shard_id;
return Status::OK();
}
Status KinesisDataConsumer::group_consume(
BlockingQueue<std::shared_ptr<Aws::Kinesis::Model::Record>>* queue,
int64_t max_running_time_ms) {
static constexpr int INTER_SHARD_SLEEP_MS = 10; // Small sleep between shards
static constexpr int MIN_INTERVAL_BETWEEN_ROUNDS_MS = 200; // Min 200ms between rounds
int64_t left_time = max_running_time_ms;
LOG(INFO) << "start Kinesis consumer: " << _id << ", grp: " << _grp_id
<< ", stream: " << _stream << ", max running time(ms): " << left_time;
int64_t received_rows = 0;
int64_t put_rows = 0;
RetryPolicy retry_policy(3, 200);
ThrottleBackoff throttle_backoff(1000, 10000);
Status st = Status::OK();
bool done = false;
MonotonicStopWatch consumer_watch;
MonotonicStopWatch watch;
watch.start();
while (true) {
// Check cancellation flag
{
std::unique_lock<std::mutex> l(_lock);
if (_cancelled) {
break;
}
}
if (left_time <= 0) {
break;
}
// Round-robin through all active shards
for (auto it = _consuming_shard_ids.begin(); it != _consuming_shard_ids.end() && !done;) {
const std::string& shard_id = *it;
auto iter_it = _shard_iterators.find(shard_id);
if (iter_it == _shard_iterators.end() || iter_it->second.empty()) {
// Shard exhausted (closed due to split/merge), remove from active set
LOG(INFO) << "Shard exhausted: " << shard_id;
it = _consuming_shard_ids.erase(it);
continue;
}
consumer_watch.start();
Aws::Kinesis::Model::GetRecordsRequest request;
DCHECK(_kinesis_conf != nullptr);
st = _kinesis_conf->apply_to_get_records_request(request, iter_it->second);
if (!st.ok()) {
LOG(WARNING) << "Failed to apply Kinesis config to GetRecordsRequest: " << st;
done = true;
break;
}
auto outcome = _kinesis_client->GetRecords(request);
consumer_watch.stop();
// Track generic routine load metrics and Kinesis-specific metrics.
DorisMetrics::instance()->routine_load_get_msg_count->increment(1);
DorisMetrics::instance()->routine_load_get_msg_latency->increment(
consumer_watch.elapsed_time() / 1000 / 1000);
DorisMetrics::instance()->routine_load_kinesis_get_records_count->increment(1);
DorisMetrics::instance()->routine_load_kinesis_get_records_latency->increment(
consumer_watch.elapsed_time() / 1000 / 1000);
if (!outcome.IsSuccess()) {
auto& error = outcome.GetError();
// Handle throttling (ProvisionedThroughputExceededException)
if (error.GetErrorType() ==
Aws::Kinesis::KinesisErrors::PROVISIONED_THROUGHPUT_EXCEEDED) {
DorisMetrics::instance()->routine_load_kinesis_throttle_count->increment(1);
LOG(INFO) << "Kinesis rate limit exceeded for shard: " << shard_id
<< ", throttle_count: " << throttle_backoff.throttle_count()
<< ", backing off";
throttle_backoff.backoff_and_sleep();
++it; // Move to next shard, will retry this one next round
continue;
}
// Handle retriable errors
if (_is_retriable_error(error)) {
DorisMetrics::instance()->routine_load_kinesis_retriable_error_count->increment(
1);
LOG(INFO) << "Kinesis retriable error for shard " << shard_id << ": "
<< error.GetMessage()
<< ", retry times: " << retry_policy.retry_count();
if (retry_policy.should_retry()) {
retry_policy.retry_with_backoff();
continue;
}
}
// Fatal error
LOG(WARNING) << "Kinesis consume failed for shard " << shard_id << ": "
<< error.GetMessage() << " (" << static_cast<int>(error.GetErrorType())
<< ")";
st = Status::InternalError("Kinesis GetRecords failed for shard {}: {}", shard_id,
error.GetMessage());
done = true;
break;
}
// Reset retry counter on success
retry_policy.reset();
throttle_backoff.reset();
// Process records - move result to allow moving individual records
auto result = outcome.GetResultWithOwnership();
auto millis_behind = result.GetMillisBehindLatest();
std::string next_iterator = result.GetNextShardIterator();
size_t record_count = result.GetRecords().size();
RETURN_IF_ERROR(_process_records(shard_id, std::move(result), queue, &received_rows,
&put_rows));
// Track MillisBehindLatest for this shard (used by FE for lag monitoring & scheduling)
_millis_behind_latest[shard_id] = millis_behind;
// Update shard iterator for next call
if (next_iterator.empty()) {
// Shard is closed (split/merge), mark as closed and remove from active set
LOG(INFO) << "Shard closed: " << shard_id << " (split/merge detected)";
DorisMetrics::instance()->routine_load_kinesis_closed_shard_count->increment(1);
_closed_shard_ids.insert(shard_id);
_shard_iterators.erase(shard_id);
it = _consuming_shard_ids.erase(it);
} else {
// Update iterator for next consumption
_shard_iterators[shard_id] = next_iterator;
if (record_count == 0) {
// No records in this batch - shard has caught up with latest data
// Remove from active set for this round (similar to Kafka PARTITION_EOF)
// but keep iterator and progress for next task execution
LOG(INFO) << "Shard has no new data: " << shard_id
<< " (MillisBehindLatest=" << millis_behind << ")";
it = _consuming_shard_ids.erase(it);
} else {
++it;
}
}
// Check if all shards are exhausted
if (_consuming_shard_ids.empty()) {
LOG(INFO) << "All shards exhausted for consumer: " << _id;
done = true;
break;
}
// Small sleep to avoid tight loop
std::this_thread::sleep_for(std::chrono::milliseconds(INTER_SHARD_SLEEP_MS));
}
// Ensure minimum interval between rounds to respect Kinesis rate limits (5 GetRecords/sec per shard)
std::this_thread::sleep_for(std::chrono::milliseconds(MIN_INTERVAL_BETWEEN_ROUNDS_MS));
left_time = max_running_time_ms - watch.elapsed_time() / 1000 / 1000;
if (done) {
break;
}
}
LOG(INFO) << "Kinesis consumer done: " << _id << ", grp: " << _grp_id
<< ". cancelled: " << _cancelled << ", left time(ms): " << left_time
<< ", total cost(ms): " << watch.elapsed_time() / 1000 / 1000
<< ", consume cost(ms): " << consumer_watch.elapsed_time() / 1000 / 1000
<< ", received rows: " << received_rows << ", put rows: " << put_rows;
return st;
}
Status KinesisDataConsumer::_process_records(
const std::string& shard_id, Aws::Kinesis::Model::GetRecordsResult result,
BlockingQueue<std::shared_ptr<Aws::Kinesis::Model::Record>>* queue, int64_t* received_rows,
int64_t* put_rows) {
// result is owned by value, safe to get mutable access to its records
auto records =
std::move(const_cast<Aws::Vector<Aws::Kinesis::Model::Record>&>(result.GetRecords()));
for (auto& record : records) {
DorisMetrics::instance()->routine_load_consume_bytes->increment(
record.GetData().GetLength());
if (record.GetData().GetLength() == 0) {
// Skip empty records
continue;
}
// Track the last sequence number for this shard
_committed_sequence_numbers[shard_id] = record.GetSequenceNumber();
// Move record into shared_ptr to avoid expensive copy
auto record_ptr = std::make_shared<Aws::Kinesis::Model::Record>(std::move(record));
if (!queue->controlled_blocking_put(record_ptr,
config::blocking_queue_cv_wait_timeout_ms)) {
// Queue shutdown
return Status::InternalError("Queue shutdown during record processing");
}
(*put_rows)++;
(*received_rows)++;
DorisMetrics::instance()->routine_load_consume_rows->increment(1);
}
return Status::OK();
}
bool KinesisDataConsumer::_is_retriable_error(
const Aws::Client::AWSError<Aws::Kinesis::KinesisErrors>& error) {
auto error_type = error.GetErrorType();
return error_type == Aws::Kinesis::KinesisErrors::PROVISIONED_THROUGHPUT_EXCEEDED ||
error_type == Aws::Kinesis::KinesisErrors::SERVICE_UNAVAILABLE ||
error_type == Aws::Kinesis::KinesisErrors::INTERNAL_FAILURE ||
error_type == Aws::Kinesis::KinesisErrors::NETWORK_CONNECTION || error.ShouldRetry();
}
Status KinesisDataConsumer::reset() {
std::unique_lock<std::mutex> l(_lock);
_cancelled = false;
_consuming_shard_ids.clear();
_shard_iterators.clear();
_millis_behind_latest.clear();
_committed_sequence_numbers.clear();
_closed_shard_ids.clear();
_last_visit_time = time(nullptr);
LOG(INFO) << "Kinesis consumer reset: " << _id;
return Status::OK();
}
Status KinesisDataConsumer::cancel(std::shared_ptr<StreamLoadContext> ctx) {
std::unique_lock<std::mutex> l(_lock);
if (!_init) {
return Status::InternalError("Kinesis consumer is not initialized");
}
_cancelled = true;
LOG(INFO) << "Kinesis consumer cancelled: " << _id << ", " << ctx->brief();
return Status::OK();
}
bool KinesisDataConsumer::match(std::shared_ptr<StreamLoadContext> ctx) {
if (ctx->load_src_type != TLoadSourceType::KINESIS) {
return false;
}
if (_region != ctx->kinesis_info->region || _stream != ctx->kinesis_info->stream ||
_endpoint != ctx->kinesis_info->endpoint) {
return false;
}
// Check that properties match
return PropertyMatcher::properties_match(_custom_properties, ctx->kinesis_info->properties);
}
Status KinesisDataConsumer::get_shard_list(std::vector<std::string>* shard_ids) {
DORIS_CHECK(_kinesis_client);
// If user specified explicit shards, return those
if (!_explicit_shards.empty()) {
*shard_ids = _explicit_shards;
LOG(INFO) << "Using " << shard_ids->size() << " explicit shards for stream: " << _stream;
return Status::OK();
}
// Discover all shards
Aws::Kinesis::Model::ListShardsRequest request;
DCHECK(_kinesis_conf != nullptr);
Status st = _kinesis_conf->apply_to_list_shards_request(request, _stream);
if (!st.ok()) {
return Status::InternalError("Failed to apply Kinesis config to ListShardsRequest: {}",
st.to_string());
}
// Only return OPEN shards here. FE will keep recently retired parent shards in its
// closed list until they are fully drained, then remove them permanently. Returning
// CLOSED shards from ListShards would make already-drained parents look newly discovered
// and cause them to restart from TRIM_HORIZON.
std::vector<std::string> discovered_shard_ids;
bool saw_any_shard = false;
while (true) {
auto outcome = _kinesis_client->ListShards(request);
if (!outcome.IsSuccess()) {
auto& error = outcome.GetError();
return Status::InternalError("Failed to list shards for stream {}: {} ({})", _stream,
error.GetMessage(),
static_cast<int>(error.GetErrorType()));
}
const auto& result = outcome.GetResult();
if (!result.GetShards().empty()) {
saw_any_shard = true;
}
for (const auto& shard : result.GetShards()) {
const auto& ending_sequence_number =
shard.GetSequenceNumberRange().GetEndingSequenceNumber();
if (!ending_sequence_number.empty()) {
continue;
}
discovered_shard_ids.emplace_back(shard.GetShardId());
}
const Aws::String& next_token = result.GetNextToken();
if (next_token.empty()) {
break;
}
Aws::Kinesis::Model::ListShardsRequest next_request;
// AWS requires paginated ListShards requests to use NextToken instead of StreamName.
next_request.SetNextToken(next_token);
if (request.MaxResultsHasBeenSet()) {
next_request.SetMaxResults(request.GetMaxResults());
}
request = std::move(next_request);
}
if (discovered_shard_ids.empty() && !saw_any_shard) {
return Status::InternalError("No shards found in Kinesis stream: {}", _stream);
}
*shard_ids = std::move(discovered_shard_ids);
LOG(INFO) << "Found " << shard_ids->size() << " open shards in stream: " << _stream;
return Status::OK();
}
} // end namespace doris