blob: f4dbebb5af72618ec752de4937d93e7e9f24979b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "OrcUtil.h"
#include <IO/Operators.h>
#include <IO/WriteBufferFromString.h>
#include <Processors/Formats/Impl/ArrowBufferedStreams.h>
#include <arrow/result.h>
#include <arrow/status.h>
#include <boost/algorithm/string/case_conv.hpp>
#include <Common/Exception.h>
#define ORC_THROW_NOT_OK(s) \
do { \
arrow::Status _s = (s); \
if (!_s.ok()) { \
DB::WriteBufferFromOwnString ss; \
ss << "Arrow error: " << _s.ToString(); \
throw orc::ParseError(ss.str()); \
} \
} while (0)
#define ORC_ASSIGN_OR_THROW_IMPL(status_name, lhs, rexpr) \
auto status_name = (rexpr); \
ORC_THROW_NOT_OK(status_name.status()); \
lhs = std::move(status_name).ValueOrDie();
#define ORC_ASSIGN_OR_THROW(lhs, rexpr) \
ORC_ASSIGN_OR_THROW_IMPL(ARROW_ASSIGN_OR_RAISE_NAME(_error_or_value, __COUNTER__), \
lhs, rexpr);
#define ORC_BEGIN_CATCH_NOT_OK try {
#define ORC_END_CATCH_NOT_OK \
} \
catch (const orc::ParseError& e) { \
return arrow::Status::IOError(e.what()); \
} \
catch (const orc::InvalidArgument& e) { \
return arrow::Status::Invalid(e.what()); \
} \
catch (const orc::NotImplementedYet& e) { \
return arrow::Status::NotImplemented(e.what()); \
}
#define ORC_CATCH_NOT_OK(_s) \
ORC_BEGIN_CATCH_NOT_OK(_s); \
ORC_END_CATCH_NOT_OK
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
}
}
namespace local_engine
{
uint64_t ArrowInputFile::getLength() const
{
ORC_ASSIGN_OR_THROW(int64_t size, file->GetSize())
return static_cast<uint64_t>(size);
}
uint64_t ArrowInputFile::getNaturalReadSize() const
{
return 128 * 1024;
}
void ArrowInputFile::read(void * buf, uint64_t length, uint64_t offset)
{
ORC_ASSIGN_OR_THROW(int64_t bytes_read, file->ReadAt(offset, length, buf))
if (static_cast<uint64_t>(bytes_read) != length)
{
throw orc::ParseError("Short read from arrow input file");
}
}
const std::string & ArrowInputFile::getName() const
{
static const std::string filename("ArrowInputFile");
return filename;
}
arrow::Status innerCreateOrcReader(std::shared_ptr<arrow::io::RandomAccessFile> file_, std::unique_ptr<orc::Reader> * orc_reader)
{
std::unique_ptr<ArrowInputFile> io_wrapper(new ArrowInputFile(file_));
orc::ReaderOptions options;
ORC_CATCH_NOT_OK(*orc_reader = orc::createReader(std::move(io_wrapper), options))
return arrow::Status::OK();
}
std::unique_ptr<orc::Reader> OrcUtil::createOrcReader(std::shared_ptr<arrow::io::RandomAccessFile> file_)
{
std::unique_ptr<orc::Reader> orc_reader;
auto status = innerCreateOrcReader(file_, &orc_reader);
if (!status.ok())
{
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Create orc reader failed. {}", status.message());
}
return orc_reader;
}
size_t OrcUtil::countIndicesForType(std::shared_ptr<arrow::DataType> type)
{
if (type->id() == arrow::Type::LIST)
return countIndicesForType(static_cast<arrow::ListType *>(type.get())->value_type()) + 1;
if (type->id() == arrow::Type::STRUCT)
{
int indices = 1;
auto * struct_type = static_cast<arrow::StructType *>(type.get());
for (int i = 0; i != struct_type->num_fields(); ++i)
indices += countIndicesForType(struct_type->field(i)->type());
return indices;
}
if (type->id() == arrow::Type::MAP)
{
auto * map_type = static_cast<arrow::MapType *>(type.get());
return countIndicesForType(map_type->key_type()) + countIndicesForType(map_type->item_type()) + 1;
}
return 1;
}
void OrcUtil::getFileReaderAndSchema(
DB::ReadBuffer & in,
std::unique_ptr<arrow::adapters::orc::ORCFileReader> & file_reader,
std::shared_ptr<arrow::Schema> & schema,
const DB::FormatSettings & format_settings,
std::atomic<int> & is_stopped)
{
auto arrow_file = DB::asArrowFile(in, format_settings, is_stopped, "ORC", ORC_MAGIC_BYTES);
if (is_stopped)
return;
auto result = arrow::adapters::orc::ORCFileReader::Open(arrow_file, arrow::default_memory_pool());
if (!result.ok())
throw DB::Exception::createRuntime(DB::ErrorCodes::BAD_ARGUMENTS, result.status().ToString());
file_reader = std::move(result).ValueOrDie();
auto read_schema_result = file_reader->ReadSchema();
if (!read_schema_result.ok())
throw DB::Exception::createRuntime(DB::ErrorCodes::BAD_ARGUMENTS, read_schema_result.status().ToString());
schema = std::move(read_schema_result).ValueOrDie();
if (format_settings.use_lowercase_column_name)
{
std::vector<std::shared_ptr<::arrow::Field>> fields;
fields.reserve(schema->num_fields());
for (int i = 0; i < schema->num_fields(); ++i)
{
const auto & field = schema->field(i);
auto name = field->name();
boost::to_lower(name);
fields.push_back(field->WithName(name));
}
schema = arrow::schema(fields, schema->metadata());
}
}
}