blob: d0e2a4b3b6203bc713c166eede45226f9f6e4308 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "PartitionColumnFillingTransform.h"
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionHelpers.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <Common/GlutenStringUtils.h>
using namespace DB;
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_TYPE;
}
}
namespace local_engine
{
template <typename Type>
requires(
std::is_same_v<Type, Int8> || std::is_same_v<Type, UInt16> || std::is_same_v<Type, Int16> || std::is_same_v<Type, Int32>
|| std::is_same_v<Type, Int64>)
ColumnPtr createIntPartitionColumn(DataTypePtr column_type, const std::string & partition_value)
{
Type value;
auto value_buffer = ReadBufferFromString(partition_value);
readIntText(value, value_buffer);
return column_type->createColumnConst(1, value);
}
template <typename Type>
requires(std::is_same_v<Type, Float32> || std::is_same_v<Type, Float64>)
ColumnPtr createFloatPartitionColumn(DataTypePtr column_type, const std::string & partition_value)
{
Type value;
auto value_buffer = ReadBufferFromString(partition_value);
readFloatText(value, value_buffer);
return column_type->createColumnConst(1, value);
}
PartitionColumnFillingTransform::PartitionColumnFillingTransform(
const DB::Block & input_, const DB::Block & output_, const String & partition_col_name_, const String & partition_col_value_)
: ISimpleTransform(input_, output_, true), partition_col_name(partition_col_name_), partition_col_value(partition_col_value_)
{
partition_col_type = output_.getByName(partition_col_name_, true).type;
partition_column = createPartitionColumn();
}
ColumnPtr PartitionColumnFillingTransform::createPartitionColumn()
{
ColumnPtr result;
DataTypePtr nested_type = partition_col_type;
if (const DataTypeNullable * nullable_type = checkAndGetDataType<DataTypeNullable>(partition_col_type.get()))
{
nested_type = nullable_type->getNestedType();
if (GlutenStringUtils::isNullPartitionValue(partition_col_value))
{
return nullable_type->createColumnConstWithDefaultValue(1);
}
}
WhichDataType which(nested_type);
if (which.isInt8())
{
result = createIntPartitionColumn<Int8>(partition_col_type, partition_col_value);
}
else if (which.isInt16())
{
result = createIntPartitionColumn<Int16>(partition_col_type, partition_col_value);
}
else if (which.isInt32())
{
result = createIntPartitionColumn<Int32>(partition_col_type, partition_col_value);
}
else if (which.isInt64())
{
result = createIntPartitionColumn<Int64>(partition_col_type, partition_col_value);
}
else if (which.isFloat32())
{
result = createFloatPartitionColumn<Float32>(partition_col_type, partition_col_value);
}
else if (which.isFloat64())
{
result = createFloatPartitionColumn<Float64>(partition_col_type, partition_col_value);
}
else if (which.isDate())
{
DayNum value;
auto value_buffer = ReadBufferFromString(partition_col_value);
readDateText(value, value_buffer);
result = partition_col_type->createColumnConst(1, value);
}
else if (which.isString())
{
result = partition_col_type->createColumnConst(1, partition_col_value);
}
else
{
throw Exception(ErrorCodes::UNKNOWN_TYPE, "unsupported datatype {}", partition_col_type->getFamilyName());
}
return result;
}
void PartitionColumnFillingTransform::transform(DB::Chunk & chunk)
{
size_t partition_column_position = output.getHeader().getPositionByName(partition_col_name);
if (partition_column_position == input.getHeader().columns())
{
chunk.addColumn(partition_column->cloneResized(chunk.getNumRows()));
}
else
{
chunk.addColumn(partition_column_position, partition_column->cloneResized(chunk.getNumRows()));
}
}
}