blob: 7ae30b51729b40dbf8f3101467e999bd83759627 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <stdint.h>
#include <ctime>
#include <map>
#include <memory>
#include <mutex>
#include <ostream>
#include <string>
#include <unordered_map>
#include <vector>
#include "common/logging.h"
#include "common/status.h"
#include "librdkafka/rdkafkacpp.h"
#include "runtime/stream_load/stream_load_context.h"
#include "util/uid_util.h"
namespace doris {
template <typename T>
class BlockingQueue;
class DataConsumer {
public:
DataConsumer()
: _id(UniqueId::gen_uid()),
_grp_id(UniqueId::gen_uid()),
_has_grp(false),
_init(false),
_cancelled(false),
_last_visit_time(0) {}
virtual ~DataConsumer() {}
// init the consumer with the given parameters
virtual Status init(std::shared_ptr<StreamLoadContext> ctx) = 0;
// start consuming
virtual Status consume(std::shared_ptr<StreamLoadContext> ctx) = 0;
// cancel the consuming process.
// if the consumer is not initialized, or the consuming
// process is already finished, call cancel() will
// return ERROR
virtual Status cancel(std::shared_ptr<StreamLoadContext> ctx) = 0;
// reset the data consumer before being reused
virtual Status reset() = 0;
// return true the if the consumer match the need
virtual bool match(std::shared_ptr<StreamLoadContext> ctx) = 0;
const UniqueId& id() { return _id; }
time_t last_visit_time() { return _last_visit_time; }
void set_grp(const UniqueId& grp_id) {
_grp_id = grp_id;
_has_grp = true;
}
protected:
UniqueId _id;
UniqueId _grp_id;
bool _has_grp;
// lock to protect the following bools
std::mutex _lock;
bool _init;
bool _cancelled;
time_t _last_visit_time;
};
class PIntegerPair;
class KafkaEventCb : public RdKafka::EventCb {
public:
void event_cb(RdKafka::Event& event) {
switch (event.type()) {
case RdKafka::Event::EVENT_ERROR:
LOG(INFO) << "kafka error: " << RdKafka::err2str(event.err())
<< ", event: " << event.str();
break;
case RdKafka::Event::EVENT_STATS:
LOG(INFO) << "kafka stats: " << event.str();
break;
case RdKafka::Event::EVENT_LOG:
LOG(INFO) << "kafka log-" << event.severity() << "-" << event.fac().c_str()
<< ", event: " << event.str();
break;
case RdKafka::Event::EVENT_THROTTLE:
LOG(INFO) << "kafka throttled: " << event.throttle_time() << "ms by "
<< event.broker_name() << " id " << (int)event.broker_id();
break;
default:
LOG(INFO) << "kafka event: " << event.type()
<< ", err: " << RdKafka::err2str(event.err()) << ", event: " << event.str();
break;
}
}
};
class KafkaDataConsumer : public DataConsumer {
public:
KafkaDataConsumer(std::shared_ptr<StreamLoadContext> ctx)
: _brokers(ctx->kafka_info->brokers), _topic(ctx->kafka_info->topic) {}
virtual ~KafkaDataConsumer() {
VLOG_NOTICE << "deconstruct consumer";
if (_k_consumer) {
_k_consumer->close();
delete _k_consumer;
_k_consumer = nullptr;
}
}
Status init(std::shared_ptr<StreamLoadContext> ctx) override;
// TODO(cmy): currently do not implement single consumer start method, using group_consume
Status consume(std::shared_ptr<StreamLoadContext> ctx) override { return Status::OK(); }
Status cancel(std::shared_ptr<StreamLoadContext> ctx) override;
// reassign partition topics
virtual Status reset() override;
bool match(std::shared_ptr<StreamLoadContext> ctx) override;
// commit kafka offset
Status commit(std::vector<RdKafka::TopicPartition*>& offset);
Status assign_topic_partitions(const std::map<int32_t, int64_t>& begin_partition_offset,
const std::string& topic,
std::shared_ptr<StreamLoadContext> ctx);
// start the consumer and put msgs to queue
Status group_consume(BlockingQueue<RdKafka::Message*>* queue, int64_t max_running_time_ms);
// get the partitions ids of the topic
Status get_partition_meta(std::vector<int32_t>* partition_ids);
// get offsets for times
Status get_offsets_for_times(const std::vector<PIntegerPair>& times,
std::vector<PIntegerPair>* offsets, int timeout);
// get latest offsets for partitions
Status get_latest_offsets_for_partitions(const std::vector<int32_t>& partition_ids,
std::vector<PIntegerPair>* offsets, int timeout);
// get offsets for times
Status get_real_offsets_for_partitions(const std::vector<PIntegerPair>& offset_flags,
std::vector<PIntegerPair>* offsets, int timeout);
private:
std::string _brokers;
std::string _topic;
std::unordered_map<std::string, std::string> _custom_properties;
std::set<int32_t> _consuming_partition_ids;
KafkaEventCb _k_event_cb;
RdKafka::KafkaConsumer* _k_consumer = nullptr;
};
} // end namespace doris