blob: 5cabe069e255191abf209c91fdc57721ad69fa52 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "OutputFormatFile.h"
#include <DataTypes/IDataType.h>
#include <Processors/Port.h>
#include <Storages/Output/ORCOutputFormatFile.h>
#include <Storages/Output/ParquetOutputFormatFile.h>
#include <boost/algorithm/string/case_conv.hpp>
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int BAD_ARGUMENTS;
}
}
namespace local_engine
{
using namespace DB;
OutputFormatFile::OutputFormatFile(
DB::ContextPtr context_,
const std::string & file_uri_,
WriteBufferBuilderPtr write_buffer_builder_,
const DB::Block & preferred_schema_)
: context(context_), file_uri(file_uri_), write_buffer_builder(write_buffer_builder_), preferred_schema(preferred_schema_)
{
}
Block OutputFormatFile::createHeaderWithPreferredSchema(const Block & header)
{
if (preferred_schema.empty())
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "preferred_schema is empty");
/// Create a new header with the preferred column name and type
DB::ColumnsWithTypeAndName columns;
columns.reserve(preferred_schema.columns());
size_t index = 0;
for (const auto & name_type : header.getNamesAndTypesList())
{
if (name_type.name.starts_with("__bucket_value__"))
continue;
const auto & preferred_column = preferred_schema.getByPosition(index++);
ColumnWithTypeAndName column(preferred_column.type->createColumn(), preferred_column.type, preferred_column.name);
columns.emplace_back(std::move(column));
}
return {std::move(columns)};
}
OutputFormatFilePtr OutputFormatFileUtil::createFile(
DB::ContextPtr context,
local_engine::WriteBufferBuilderPtr write_buffer_builder,
const std::string & file_uri,
const DB::Block & preferred_schema,
const std::string & format_hint)
{
#if USE_PARQUET
if (boost::to_lower_copy(file_uri).ends_with(".parquet") || "parquet" == boost::to_lower_copy(format_hint))
return std::make_shared<ParquetOutputFormatFile>(context, file_uri, write_buffer_builder, preferred_schema);
#endif
#if USE_ORC
if (boost::to_lower_copy(file_uri).ends_with(".orc") || "orc" == boost::to_lower_copy(format_hint))
return std::make_shared<ORCOutputFormatFile>(context, file_uri, write_buffer_builder, preferred_schema);
#endif
throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Format not supported for file :{}", file_uri);
}
}