| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #pragma once |
| |
| #include <aws/kinesis/KinesisClient.h> |
| #include <aws/kinesis/model/GetRecordsRequest.h> |
| #include <aws/kinesis/model/GetRecordsResult.h> |
| #include <aws/kinesis/model/GetShardIteratorRequest.h> |
| #include <aws/kinesis/model/ListShardsRequest.h> |
| #include <aws/kinesis/model/Record.h> |
| #include <stdint.h> |
| |
| #include <ctime> |
| #include <map> |
| #include <memory> |
| #include <mutex> |
| #include <ostream> |
| #include <string> |
| #include <unordered_map> |
| #include <vector> |
| |
| #include "common/logging.h" |
| #include "common/status.h" |
| #include "librdkafka/rdkafkacpp.h" |
| #include "load/routine_load/kinesis_conf.h" |
| #include "load/stream_load/stream_load_context.h" |
| #include "runtime/aws_msk_iam_auth.h" |
| #include "util/uid_util.h" |
| |
| namespace doris { |
| |
| template <typename T> |
| class BlockingQueue; |
| |
| class DataConsumer { |
| public: |
| DataConsumer() |
| : _id(UniqueId::gen_uid()), |
| _grp_id(UniqueId::gen_uid()), |
| _has_grp(false), |
| _init(false), |
| _cancelled(false), |
| _last_visit_time(0) {} |
| |
| virtual ~DataConsumer() {} |
| |
| // init the consumer with the given parameters |
| virtual Status init(std::shared_ptr<StreamLoadContext> ctx) = 0; |
| // start consuming |
| virtual Status consume(std::shared_ptr<StreamLoadContext> ctx) = 0; |
| // cancel the consuming process. |
| // if the consumer is not initialized, or the consuming |
| // process is already finished, call cancel() will |
| // return ERROR |
| virtual Status cancel(std::shared_ptr<StreamLoadContext> ctx) = 0; |
| // reset the data consumer before being reused |
| virtual Status reset() = 0; |
| // return true the if the consumer match the need |
| virtual bool match(std::shared_ptr<StreamLoadContext> ctx) = 0; |
| |
| const UniqueId& id() { return _id; } |
| time_t last_visit_time() { return _last_visit_time; } |
| void set_grp(const UniqueId& grp_id) { |
| _grp_id = grp_id; |
| _has_grp = true; |
| } |
| |
| protected: |
| UniqueId _id; |
| UniqueId _grp_id; |
| bool _has_grp; |
| |
| // lock to protect the following bools |
| std::mutex _lock; |
| bool _init; |
| bool _cancelled; |
| time_t _last_visit_time; |
| }; |
| |
| class PIntegerPair; |
| |
| class KafkaEventCb : public RdKafka::EventCb { |
| public: |
| void event_cb(RdKafka::Event& event) { |
| switch (event.type()) { |
| case RdKafka::Event::EVENT_ERROR: |
| LOG(INFO) << "kafka error: " << RdKafka::err2str(event.err()) |
| << ", event: " << event.str(); |
| break; |
| case RdKafka::Event::EVENT_STATS: |
| LOG(INFO) << "kafka stats: " << event.str(); |
| break; |
| |
| case RdKafka::Event::EVENT_LOG: |
| LOG(INFO) << "kafka log-" << event.severity() << "-" << event.fac().c_str() |
| << ", event: " << event.str(); |
| break; |
| |
| case RdKafka::Event::EVENT_THROTTLE: |
| LOG(INFO) << "kafka throttled: " << event.throttle_time() << "ms by " |
| << event.broker_name() << " id " << (int)event.broker_id(); |
| break; |
| |
| default: |
| LOG(INFO) << "kafka event: " << event.type() |
| << ", err: " << RdKafka::err2str(event.err()) << ", event: " << event.str(); |
| break; |
| } |
| } |
| }; |
| |
| class KafkaDataConsumer : public DataConsumer { |
| public: |
| KafkaDataConsumer(std::shared_ptr<StreamLoadContext> ctx) |
| : _brokers(ctx->kafka_info->brokers), _topic(ctx->kafka_info->topic) {} |
| |
| virtual ~KafkaDataConsumer() { |
| VLOG_NOTICE << "deconstruct consumer"; |
| if (_k_consumer) { |
| _k_consumer->close(); |
| delete _k_consumer; |
| _k_consumer = nullptr; |
| } |
| } |
| |
| Status init(std::shared_ptr<StreamLoadContext> ctx) override; |
| // TODO(cmy): currently do not implement single consumer start method, using group_consume |
| Status consume(std::shared_ptr<StreamLoadContext> ctx) override { return Status::OK(); } |
| Status cancel(std::shared_ptr<StreamLoadContext> ctx) override; |
| // reassign partition topics |
| virtual Status reset() override; |
| bool match(std::shared_ptr<StreamLoadContext> ctx) override; |
| // commit kafka offset |
| Status commit(std::vector<RdKafka::TopicPartition*>& offset); |
| |
| Status assign_topic_partitions(const std::map<int32_t, int64_t>& begin_partition_offset, |
| const std::string& topic, |
| std::shared_ptr<StreamLoadContext> ctx); |
| |
| // start the consumer and put msgs to queue |
| Status group_consume(BlockingQueue<RdKafka::Message*>* queue, int64_t max_running_time_ms); |
| |
| // get the partitions ids of the topic |
| Status get_partition_meta(std::vector<int32_t>* partition_ids); |
| // get offsets for times |
| Status get_offsets_for_times(const std::vector<PIntegerPair>& times, |
| std::vector<PIntegerPair>* offsets, int timeout); |
| // get latest offsets for partitions |
| Status get_latest_offsets_for_partitions(const std::vector<int32_t>& partition_ids, |
| std::vector<PIntegerPair>* offsets, int timeout); |
| // get offsets for times |
| Status get_real_offsets_for_partitions(const std::vector<PIntegerPair>& offset_flags, |
| std::vector<PIntegerPair>* offsets, int timeout); |
| |
| private: |
| std::string _brokers; |
| std::string _topic; |
| std::unordered_map<std::string, std::string> _custom_properties; |
| std::set<int32_t> _consuming_partition_ids; |
| |
| KafkaEventCb _k_event_cb; |
| RdKafka::KafkaConsumer* _k_consumer = nullptr; |
| |
| // AWS MSK IAM authentication callback (must outlive _k_consumer) |
| std::unique_ptr<AwsMskIamOAuthCallback> _aws_msk_oauth_callback; |
| }; |
| |
| // AWS Kinesis Data Consumer |
| // Consumes data from AWS Kinesis Data Streams for routine load jobs. |
| // Kinesis is similar to Kafka but uses shards instead of partitions |
| // and sequence numbers (strings) instead of offsets (integers). |
| class KinesisDataConsumer : public DataConsumer { |
| public: |
| KinesisDataConsumer(std::shared_ptr<StreamLoadContext> ctx); |
| virtual ~KinesisDataConsumer(); |
| |
| // DataConsumer interface implementation |
| Status init(std::shared_ptr<StreamLoadContext> ctx) override; |
| Status consume(std::shared_ptr<StreamLoadContext> ctx) override { return Status::OK(); } |
| Status cancel(std::shared_ptr<StreamLoadContext> ctx) override; |
| Status reset() override; |
| bool match(std::shared_ptr<StreamLoadContext> ctx) override; |
| |
| // Kinesis-specific methods |
| // Assign shards with their starting sequence numbers |
| Status assign_shards(const std::map<std::string, std::string>& shard_sequence_numbers, |
| const std::string& stream_name, std::shared_ptr<StreamLoadContext> ctx); |
| |
| // Main consumption loop - pulls records from all assigned shards |
| Status group_consume(BlockingQueue<std::shared_ptr<Aws::Kinesis::Model::Record>>* queue, |
| int64_t max_running_time_ms); |
| |
| // Get list of shard IDs |
| Status get_shard_list(std::vector<std::string>* shard_ids); |
| |
| private: |
| // Configuration - Basic AWS settings |
| std::string _region; |
| std::string _stream; |
| std::string _endpoint; // Optional custom endpoint (e.g., LocalStack) |
| |
| // Type 1: Doris-internal parameters (not passed to AWS SDK) |
| std::unordered_map<std::string, std::string> _doris_internal_properties; |
| |
| // Type 2: Frequently-used AWS parameters (explicit members for performance) |
| // These are parsed from aws.kinesis.* properties during init() |
| std::vector<std::string> _explicit_shards; // aws.kinesis.shards (comma-separated) |
| std::string _default_position; // aws.kinesis.default.pos (LATEST/TRIM_HORIZON) |
| std::map<std::string, std::string> |
| _shard_positions; // aws.kinesis.shards.pos (shard_id:position) |
| |
| // Type 3: Less-frequently-used AWS API parameters (wrapped in KinesisConf) |
| std::unique_ptr<KinesisConf> _kinesis_conf; |
| |
| // AWS credentials and other properties |
| std::unordered_map<std::string, std::string> _custom_properties; |
| |
| // Active shards being consumed |
| std::set<std::string> _consuming_shard_ids; |
| |
| // AWS Kinesis client |
| std::shared_ptr<Aws::Kinesis::KinesisClient> _kinesis_client; |
| |
| // Shard iterator management |
| // Kinesis requires shard iterators to consume records |
| // shard_id -> current shard iterator |
| std::map<std::string, std::string> _shard_iterators; |
| |
| // Tracks the MillisBehindLatest value per shard from the last GetRecords call. |
| // Updated during group_consume; read by the task executor to populate ctx after consumption. |
| std::map<std::string, int64_t> _millis_behind_latest; |
| |
| // Tracks the last consumed sequence number per shard. |
| // Updated during group_consume via _process_records; read by the consumer group |
| // to populate ctx->kinesis_info->cmt_sequence_number after consumption. |
| std::map<std::string, std::string> _committed_sequence_numbers; |
| |
| // Tracks shards that have been closed (split/merge) during consumption. |
| // FE should remove these shards from its tracking to avoid reassigning them. |
| std::set<std::string> _closed_shard_ids; |
| |
| public: |
| // Returns the MillisBehindLatest snapshot collected during group_consume. |
| const std::map<std::string, int64_t>& get_millis_behind_latest() const { |
| return _millis_behind_latest; |
| } |
| |
| // Returns the committed sequence numbers per shard collected during group_consume. |
| const std::map<std::string, std::string>& get_committed_sequence_numbers() const { |
| return _committed_sequence_numbers; |
| } |
| |
| // Returns the set of closed shard IDs detected during group_consume. |
| const std::set<std::string>& get_closed_shard_ids() const { return _closed_shard_ids; } |
| |
| private: |
| // Helper methods |
| // Create and configure AWS Kinesis client with credentials |
| Status _create_kinesis_client(std::shared_ptr<StreamLoadContext> ctx); |
| |
| // Get shard iterator for a shard at a specific sequence number position |
| Status _get_shard_iterator(const std::string& shard_id, const std::string& sequence_number, |
| std::string* iterator); |
| |
| // Process records from GetRecords result and add to queue |
| Status _process_records(const std::string& shard_id, |
| Aws::Kinesis::Model::GetRecordsResult result, |
| BlockingQueue<std::shared_ptr<Aws::Kinesis::Model::Record>>* queue, |
| int64_t* received_rows, int64_t* put_rows); |
| |
| // Check if an AWS error is retriable (throttling, network, etc.) |
| bool _is_retriable_error(const Aws::Client::AWSError<Aws::Kinesis::KinesisErrors>& error); |
| }; |
| |
| } // end namespace doris |