| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| #include "runtime/routine_load/data_consumer_group.h" |
| |
| #include <gen_cpp/PlanNodes_types.h> |
| #include <stddef.h> |
| |
| #include <map> |
| #include <ostream> |
| #include <string> |
| #include <utility> |
| |
| #include "common/logging.h" |
| #include "librdkafka/rdkafkacpp.h" |
| #include "runtime/routine_load/data_consumer.h" |
| #include "runtime/stream_load/stream_load_context.h" |
| #include "util/stopwatch.hpp" |
| |
| namespace doris { |
| #include "common/compile_check_begin.h" |
| |
| Status KafkaDataConsumerGroup::assign_topic_partitions(std::shared_ptr<StreamLoadContext> ctx) { |
| DCHECK(ctx->kafka_info); |
| DCHECK(_consumers.size() >= 1); |
| |
| // divide partitions |
| int consumer_size = doris::cast_set<int>(_consumers.size()); |
| std::vector<std::map<int32_t, int64_t>> divide_parts(consumer_size); |
| int i = 0; |
| for (auto& kv : ctx->kafka_info->begin_offset) { |
| int idx = i % consumer_size; |
| divide_parts[idx].emplace(kv.first, kv.second); |
| i++; |
| } |
| |
| // assign partitions to consumers equally |
| for (int j = 0; j < consumer_size; ++j) { |
| RETURN_IF_ERROR( |
| std::static_pointer_cast<KafkaDataConsumer>(_consumers[j]) |
| ->assign_topic_partitions(divide_parts[j], ctx->kafka_info->topic, ctx)); |
| } |
| |
| return Status::OK(); |
| } |
| |
| KafkaDataConsumerGroup::~KafkaDataConsumerGroup() { |
| // clean the msgs left in queue |
| _queue.shutdown(); |
| while (true) { |
| RdKafka::Message* msg; |
| if (_queue.blocking_get(&msg)) { |
| delete msg; |
| msg = nullptr; |
| } else { |
| break; |
| } |
| } |
| DCHECK(_queue.get_size() == 0); |
| } |
| |
| Status KafkaDataConsumerGroup::start_all(std::shared_ptr<StreamLoadContext> ctx, |
| std::shared_ptr<io::KafkaConsumerPipe> kafka_pipe) { |
| Status result_st = Status::OK(); |
| // start all consumers |
| for (auto& consumer : _consumers) { |
| if (!_thread_pool.offer(std::bind<void>( |
| &KafkaDataConsumerGroup::actual_consume, this, consumer, &_queue, |
| ctx->max_interval_s * 1000, [this, &result_st](const Status& st) { |
| std::unique_lock<std::mutex> lock(_mutex); |
| _counter--; |
| VLOG_CRITICAL << "group counter is: " << _counter << ", grp: " << _grp_id; |
| if (_counter == 0) { |
| _queue.shutdown(); |
| LOG(INFO) << "all consumers are finished. shutdown queue. group id: " |
| << _grp_id; |
| } |
| if (result_st.ok() && !st.ok()) { |
| result_st = st; |
| } |
| }))) { |
| LOG(WARNING) << "failed to submit data consumer: " << consumer->id() |
| << ", group id: " << _grp_id; |
| return Status::InternalError("failed to submit data consumer"); |
| } else { |
| VLOG_CRITICAL << "submit a data consumer: " << consumer->id() |
| << ", group id: " << _grp_id; |
| } |
| } |
| |
| // consuming from queue and put data to stream load pipe |
| int64_t left_time = ctx->max_interval_s * 1000; |
| int64_t left_rows = ctx->max_batch_rows; |
| int64_t left_bytes = ctx->max_batch_size; |
| |
| LOG(INFO) << "start consumer group: " << _grp_id << ". max time(ms): " << left_time |
| << ", batch rows: " << left_rows << ", batch size: " << left_bytes << ". " |
| << ctx->brief(); |
| |
| // copy one |
| std::map<int32_t, int64_t> cmt_offset = ctx->kafka_info->cmt_offset; |
| |
| //improve performance |
| Status (io::KafkaConsumerPipe::*append_data)(const char* data, size_t size); |
| if (ctx->format == TFileFormatType::FORMAT_JSON) { |
| append_data = &io::KafkaConsumerPipe::append_json; |
| } else { |
| append_data = &io::KafkaConsumerPipe::append_with_line_delimiter; |
| } |
| |
| MonotonicStopWatch watch; |
| watch.start(); |
| bool eos = false; |
| while (true) { |
| if (eos || left_time <= 0 || left_rows <= 0 || left_bytes <= 0) { |
| LOG(INFO) << "consumer group done: " << _grp_id |
| << ". consume time(ms)=" << ctx->max_interval_s * 1000 - left_time |
| << ", received rows=" << ctx->max_batch_rows - left_rows |
| << ", received bytes=" << ctx->max_batch_size - left_bytes << ", eos: " << eos |
| << ", left_time: " << left_time << ", left_rows: " << left_rows |
| << ", left_bytes: " << left_bytes |
| << ", blocking get time(us): " << _queue.total_get_wait_time() / 1000 |
| << ", blocking put time(us): " << _queue.total_put_wait_time() / 1000 << ", " |
| << ctx->brief(); |
| |
| // shutdown queue |
| _queue.shutdown(); |
| // cancel all consumers |
| for (auto& consumer : _consumers) { |
| static_cast<void>(consumer->cancel(ctx)); |
| } |
| |
| // waiting all threads finished |
| _thread_pool.shutdown(); |
| _thread_pool.join(); |
| if (!result_st.ok()) { |
| kafka_pipe->cancel(result_st.to_string()); |
| return result_st; |
| } |
| static_cast<void>(kafka_pipe->finish()); |
| ctx->kafka_info->cmt_offset = std::move(cmt_offset); |
| ctx->receive_bytes = ctx->max_batch_size - left_bytes; |
| return Status::OK(); |
| } |
| |
| RdKafka::Message* msg; |
| bool res = _queue.controlled_blocking_get(&msg, config::blocking_queue_cv_wait_timeout_ms); |
| if (res) { |
| // conf has to be deleted finally |
| Defer delete_msg {[msg]() { delete msg; }}; |
| VLOG_NOTICE << "get kafka message" |
| << ", partition: " << msg->partition() << ", offset: " << msg->offset() |
| << ", len: " << msg->len(); |
| |
| if (msg->err() == RdKafka::ERR__PARTITION_EOF) { |
| if (msg->offset() > 0) { |
| cmt_offset[msg->partition()] = msg->offset() - 1; |
| } |
| } else { |
| Status st = (kafka_pipe.get()->*append_data)( |
| static_cast<const char*>(msg->payload()), static_cast<size_t>(msg->len())); |
| if (st.ok()) { |
| left_rows--; |
| left_bytes -= msg->len(); |
| cmt_offset[msg->partition()] = msg->offset(); |
| VLOG_NOTICE << "consume partition[" << msg->partition() << " - " |
| << msg->offset() << "]"; |
| } else { |
| // failed to append this msg, we must stop |
| LOG(WARNING) << "failed to append msg to pipe. grp: " << _grp_id; |
| eos = true; |
| { |
| std::unique_lock<std::mutex> lock(_mutex); |
| if (result_st.ok()) { |
| result_st = st; |
| } |
| } |
| } |
| } |
| } else { |
| // queue is empty and shutdown |
| eos = true; |
| } |
| |
| left_time = ctx->max_interval_s * 1000 - watch.elapsed_time() / 1000 / 1000; |
| } |
| |
| return Status::OK(); |
| } |
| |
| void KafkaDataConsumerGroup::actual_consume(std::shared_ptr<DataConsumer> consumer, |
| BlockingQueue<RdKafka::Message*>* queue, |
| int64_t max_running_time_ms, ConsumeFinishCallback cb) { |
| Status st = std::static_pointer_cast<KafkaDataConsumer>(consumer)->group_consume( |
| queue, max_running_time_ms); |
| cb(st); |
| } |
| #include "common/compile_check_end.h" |
| |
| } // namespace doris |