blob: cb45e4f3ebb02a290a15d4923768333bdd4d35de [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "StreamKafkaRelParser.h"
#include <Parser/SubstraitParserUtils.h>
#include <Parser/TypeParser.h>
#include <Storages/Kafka/ReadFromGlutenStorageKafka.h>
#include <Common/BlockTypeUtils.h>
#include <Common/logger_useful.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NO_SUCH_DATA_PART;
extern const int LOGICAL_ERROR;
extern const int UNKNOWN_FUNCTION;
extern const int UNKNOWN_TYPE;
}
}
namespace local_engine
{
DB::QueryPlanPtr
StreamKafkaRelParser::parse(DB::QueryPlanPtr query_plan, const substrait::Rel & rel, std::list<const substrait::Rel *> & rel_stack_)
{
if (rel.has_read())
return parseRelImpl(std::move(query_plan), rel.read());
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "StreamKafkaRelParser can't parse rel:{}", rel.ShortDebugString());
}
DB::QueryPlanPtr StreamKafkaRelParser::parseRelImpl(DB::QueryPlanPtr query_plan, const substrait::ReadRel & read_rel)
{
if (!read_rel.has_stream_kafka())
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Can't not parse kafka rel, because of read rel don't contained stream kafka");
auto kafka_task = BinaryToMessage<substrait::ReadRel::StreamKafka>(split_info);
auto topic = kafka_task.topic_partition().topic();
auto partition = kafka_task.topic_partition().partition();
auto start_offset = kafka_task.start_offset();
auto end_offset = kafka_task.end_offset();
auto poll_timeout_ms = kafka_task.poll_timeout_ms();
String group_id;
String brokers;
for (auto param : kafka_task.params())
if (param.first == "poll_timeout_ms")
poll_timeout_ms = std::stoi(param.second);
else if (param.first == "group.id")
group_id = param.second;
else if (param.first == "bootstrap.servers")
brokers = param.second;
else
LOG_DEBUG(getLogger("StreamKafkaRelParser"), "Unused kafka parameter: {}: {}", param.first, param.second);
LOG_INFO(
getLogger("StreamKafkaRelParser"),
"Kafka source: topic: {}, partition: {}, start_offset: {}, end_offset: {}",
topic,
partition,
start_offset,
end_offset);
if (group_id.empty())
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Kafka group.id is not set");
Names topics;
topics.emplace_back(topic);
auto header = toShared(TypeParser::buildBlockFromNamedStruct(read_rel.base_schema()));
Names names = header->getNames();
auto source = std::make_unique<ReadFromGlutenStorageKafka>(
names, header, getContext(), topics, partition, start_offset, end_offset, poll_timeout_ms, group_id, brokers);
steps.emplace_back(source.get());
query_plan->addStep(std::move(source));
return query_plan;
}
}