blob: 67ccc0e1bda5d65ef64411a36409986ced2a554b [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <aws/kinesis/KinesisClient.h>
#include <aws/kinesis/model/GetRecordsRequest.h>
#include <aws/kinesis/model/GetRecordsResult.h>
#include <aws/kinesis/model/GetShardIteratorRequest.h>
#include <aws/kinesis/model/ListShardsRequest.h>
#include <aws/kinesis/model/Record.h>
#include <stdint.h>
#include <ctime>
#include <map>
#include <memory>
#include <mutex>
#include <ostream>
#include <string>
#include <unordered_map>
#include <vector>
#include "common/logging.h"
#include "common/status.h"
#include "librdkafka/rdkafkacpp.h"
#include "load/routine_load/kinesis_conf.h"
#include "load/stream_load/stream_load_context.h"
#include "runtime/aws_msk_iam_auth.h"
#include "util/uid_util.h"
namespace doris {
template <typename T>
class BlockingQueue;
class DataConsumer {
public:
DataConsumer()
: _id(UniqueId::gen_uid()),
_grp_id(UniqueId::gen_uid()),
_has_grp(false),
_init(false),
_cancelled(false),
_last_visit_time(0) {}
virtual ~DataConsumer() {}
// init the consumer with the given parameters
virtual Status init(std::shared_ptr<StreamLoadContext> ctx) = 0;
// start consuming
virtual Status consume(std::shared_ptr<StreamLoadContext> ctx) = 0;
// cancel the consuming process.
// if the consumer is not initialized, or the consuming
// process is already finished, call cancel() will
// return ERROR
virtual Status cancel(std::shared_ptr<StreamLoadContext> ctx) = 0;
// reset the data consumer before being reused
virtual Status reset() = 0;
// return true the if the consumer match the need
virtual bool match(std::shared_ptr<StreamLoadContext> ctx) = 0;
const UniqueId& id() { return _id; }
time_t last_visit_time() { return _last_visit_time; }
void set_grp(const UniqueId& grp_id) {
_grp_id = grp_id;
_has_grp = true;
}
protected:
UniqueId _id;
UniqueId _grp_id;
bool _has_grp;
// lock to protect the following bools
std::mutex _lock;
bool _init;
bool _cancelled;
time_t _last_visit_time;
};
class PIntegerPair;
class KafkaEventCb : public RdKafka::EventCb {
public:
void event_cb(RdKafka::Event& event) {
switch (event.type()) {
case RdKafka::Event::EVENT_ERROR:
LOG(INFO) << "kafka error: " << RdKafka::err2str(event.err())
<< ", event: " << event.str();
break;
case RdKafka::Event::EVENT_STATS:
LOG(INFO) << "kafka stats: " << event.str();
break;
case RdKafka::Event::EVENT_LOG:
LOG(INFO) << "kafka log-" << event.severity() << "-" << event.fac().c_str()
<< ", event: " << event.str();
break;
case RdKafka::Event::EVENT_THROTTLE:
LOG(INFO) << "kafka throttled: " << event.throttle_time() << "ms by "
<< event.broker_name() << " id " << (int)event.broker_id();
break;
default:
LOG(INFO) << "kafka event: " << event.type()
<< ", err: " << RdKafka::err2str(event.err()) << ", event: " << event.str();
break;
}
}
};
class KafkaDataConsumer : public DataConsumer {
public:
KafkaDataConsumer(std::shared_ptr<StreamLoadContext> ctx)
: _brokers(ctx->kafka_info->brokers), _topic(ctx->kafka_info->topic) {}
virtual ~KafkaDataConsumer() {
VLOG_NOTICE << "deconstruct consumer";
if (_k_consumer) {
_k_consumer->close();
delete _k_consumer;
_k_consumer = nullptr;
}
}
Status init(std::shared_ptr<StreamLoadContext> ctx) override;
// TODO(cmy): currently do not implement single consumer start method, using group_consume
Status consume(std::shared_ptr<StreamLoadContext> ctx) override { return Status::OK(); }
Status cancel(std::shared_ptr<StreamLoadContext> ctx) override;
// reassign partition topics
virtual Status reset() override;
bool match(std::shared_ptr<StreamLoadContext> ctx) override;
// commit kafka offset
Status commit(std::vector<RdKafka::TopicPartition*>& offset);
Status assign_topic_partitions(const std::map<int32_t, int64_t>& begin_partition_offset,
const std::string& topic,
std::shared_ptr<StreamLoadContext> ctx);
// start the consumer and put msgs to queue
Status group_consume(BlockingQueue<RdKafka::Message*>* queue, int64_t max_running_time_ms);
// get the partitions ids of the topic
Status get_partition_meta(std::vector<int32_t>* partition_ids);
// get offsets for times
Status get_offsets_for_times(const std::vector<PIntegerPair>& times,
std::vector<PIntegerPair>* offsets, int timeout);
// get latest offsets for partitions
Status get_latest_offsets_for_partitions(const std::vector<int32_t>& partition_ids,
std::vector<PIntegerPair>* offsets, int timeout);
// get offsets for times
Status get_real_offsets_for_partitions(const std::vector<PIntegerPair>& offset_flags,
std::vector<PIntegerPair>* offsets, int timeout);
private:
std::string _brokers;
std::string _topic;
std::unordered_map<std::string, std::string> _custom_properties;
std::set<int32_t> _consuming_partition_ids;
KafkaEventCb _k_event_cb;
RdKafka::KafkaConsumer* _k_consumer = nullptr;
// AWS MSK IAM authentication callback (must outlive _k_consumer)
std::unique_ptr<AwsMskIamOAuthCallback> _aws_msk_oauth_callback;
};
// AWS Kinesis Data Consumer
// Consumes data from AWS Kinesis Data Streams for routine load jobs.
// Kinesis is similar to Kafka but uses shards instead of partitions
// and sequence numbers (strings) instead of offsets (integers).
class KinesisDataConsumer : public DataConsumer {
public:
KinesisDataConsumer(std::shared_ptr<StreamLoadContext> ctx);
virtual ~KinesisDataConsumer();
// DataConsumer interface implementation
Status init(std::shared_ptr<StreamLoadContext> ctx) override;
Status consume(std::shared_ptr<StreamLoadContext> ctx) override { return Status::OK(); }
Status cancel(std::shared_ptr<StreamLoadContext> ctx) override;
Status reset() override;
bool match(std::shared_ptr<StreamLoadContext> ctx) override;
// Kinesis-specific methods
// Assign shards with their starting sequence numbers
Status assign_shards(const std::map<std::string, std::string>& shard_sequence_numbers,
const std::string& stream_name, std::shared_ptr<StreamLoadContext> ctx);
// Main consumption loop - pulls records from all assigned shards
Status group_consume(BlockingQueue<std::shared_ptr<Aws::Kinesis::Model::Record>>* queue,
int64_t max_running_time_ms);
// Get list of shard IDs
Status get_shard_list(std::vector<std::string>* shard_ids);
private:
// Configuration - Basic AWS settings
std::string _region;
std::string _stream;
std::string _endpoint; // Optional custom endpoint (e.g., LocalStack)
// Type 1: Doris-internal parameters (not passed to AWS SDK)
std::unordered_map<std::string, std::string> _doris_internal_properties;
// Type 2: Frequently-used AWS parameters (explicit members for performance)
// These are parsed from aws.kinesis.* properties during init()
std::vector<std::string> _explicit_shards; // aws.kinesis.shards (comma-separated)
std::string _default_position; // aws.kinesis.default.pos (LATEST/TRIM_HORIZON)
std::map<std::string, std::string>
_shard_positions; // aws.kinesis.shards.pos (shard_id:position)
// Type 3: Less-frequently-used AWS API parameters (wrapped in KinesisConf)
std::unique_ptr<KinesisConf> _kinesis_conf;
// AWS credentials and other properties
std::unordered_map<std::string, std::string> _custom_properties;
// Active shards being consumed
std::set<std::string> _consuming_shard_ids;
// AWS Kinesis client
std::shared_ptr<Aws::Kinesis::KinesisClient> _kinesis_client;
// Shard iterator management
// Kinesis requires shard iterators to consume records
// shard_id -> current shard iterator
std::map<std::string, std::string> _shard_iterators;
// Tracks the MillisBehindLatest value per shard from the last GetRecords call.
// Updated during group_consume; read by the task executor to populate ctx after consumption.
std::map<std::string, int64_t> _millis_behind_latest;
// Tracks the last consumed sequence number per shard.
// Updated during group_consume via _process_records; read by the consumer group
// to populate ctx->kinesis_info->cmt_sequence_number after consumption.
std::map<std::string, std::string> _committed_sequence_numbers;
// Tracks shards that have been closed (split/merge) during consumption.
// FE should remove these shards from its tracking to avoid reassigning them.
std::set<std::string> _closed_shard_ids;
public:
// Returns the MillisBehindLatest snapshot collected during group_consume.
const std::map<std::string, int64_t>& get_millis_behind_latest() const {
return _millis_behind_latest;
}
// Returns the committed sequence numbers per shard collected during group_consume.
const std::map<std::string, std::string>& get_committed_sequence_numbers() const {
return _committed_sequence_numbers;
}
// Returns the set of closed shard IDs detected during group_consume.
const std::set<std::string>& get_closed_shard_ids() const { return _closed_shard_ids; }
private:
// Helper methods
// Create and configure AWS Kinesis client with credentials
Status _create_kinesis_client(std::shared_ptr<StreamLoadContext> ctx);
// Get shard iterator for a shard at a specific sequence number position
Status _get_shard_iterator(const std::string& shard_id, const std::string& sequence_number,
std::string* iterator);
// Process records from GetRecords result and add to queue
Status _process_records(const std::string& shard_id,
Aws::Kinesis::Model::GetRecordsResult result,
BlockingQueue<std::shared_ptr<Aws::Kinesis::Model::Record>>* queue,
int64_t* received_rows, int64_t* put_rows);
// Check if an AWS error is retriable (throttling, network, etc.)
bool _is_retriable_error(const Aws::Client::AWSError<Aws::Kinesis::KinesisErrors>& error);
};
} // end namespace doris