blob: eedbf10a15a9aa9f375f23267c5a243dfd14d2bd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <Core/Block.h>
#include <Core/Field.h>
#include <Processors/Chunk.h>
#include <Storages/SubstraitSource/FormatFile.h>
#include <Common/BlockTypeUtils.h>
namespace local_engine
{
class ColumnIndexFilter;
using ColumnIndexFilterPtr = std::shared_ptr<ColumnIndexFilter>;
class FormatFile;
using FormatFilePtr = std::shared_ptr<FormatFile>;
class BaseReader
{
public:
explicit BaseReader(const FormatFilePtr & file_, const DB::Block & to_read_header_, const DB::Block & header_)
: file(file_), readHeader(to_read_header_), outputHeader(toShared(header_))
{
}
virtual ~BaseReader() = default;
void cancel()
{
bool already_cancelled = is_cancelled.exchange(true, std::memory_order_acq_rel);
if (!already_cancelled)
onCancel();
}
virtual bool pull(DB::Chunk & chunk) = 0;
bool isCancelled() const { return is_cancelled.load(std::memory_order_acquire); }
const DB::Block & getHeader() const { return *outputHeader; }
const DB::SharedHeader & getSharedHeader() const { return outputHeader; }
protected:
virtual void onCancel() { };
DB::Columns addVirtualColumn(DB::Chunk dataChunk, size_t rowNum = 0) const;
FormatFilePtr file;
DB::Block readHeader;
DB::SharedHeader outputHeader;
std::atomic<bool> is_cancelled{false};
static DB::ColumnPtr createConstColumn(DB::DataTypePtr type, const DB::Field & field, size_t rows);
static DB::ColumnPtr createPartitionColumn(const String & value, const DB::DataTypePtr & type, size_t rows);
static DB::Field buildFieldFromString(const String & value, DB::DataTypePtr type);
public:
/// When run query "select count(*) from t", there is no any column to be read.
/// The only necessary information is the number of rows.
/// To handle these cases, we build blocks with a const virtual column to indicate how many rows are in it.
static DB::Block buildRowCountHeader(const DB::Block & header);
/// Factory
static std::unique_ptr<BaseReader> create(
const FormatFilePtr & current_file,
const DB::Block & readHeader,
const DB::Block & outputHeader,
const std::shared_ptr<const DB::ActionsDAG> & filter_actions_dag,
const ColumnIndexFilterPtr & column_index_filter);
};
class NormalFileReader : public BaseReader
{
public:
NormalFileReader(
const FormatFilePtr & file_,
const DB::Block & to_read_header_,
const DB::Block & output_header_,
const FormatFile::InputFormatPtr & input_format_);
~NormalFileReader() override = default;
bool pull(DB::Chunk & chunk) override;
protected:
virtual DB::Chunk doPull() { return input_format->generate(); }
private:
void onCancel() override { input_format->cancel(); }
FormatFile::InputFormatPtr input_format;
};
class ConstColumnsFileReader : public BaseReader
{
public:
ConstColumnsFileReader(const FormatFilePtr & file_, const DB::Block & header_, size_t blockSize = DB::DEFAULT_BLOCK_SIZE);
~ConstColumnsFileReader() override = default;
bool pull(DB::Chunk & chunk) override;
private:
size_t remained_rows;
const size_t block_size;
};
}