blob: 18417fb3b7e240ff20860c307257aed9c47b4722 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "SparkRowToCHColumn.h"
#include <memory>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnVector.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionHelpers.h>
#include <Common/CHUtil.h>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_TYPE;
extern const int LOGICAL_ERROR;
}
}
using namespace DB;
namespace local_engine
{
jclass SparkRowToCHColumn::spark_row_interator_class = nullptr;
jmethodID SparkRowToCHColumn::spark_row_interator_hasNext = nullptr;
jmethodID SparkRowToCHColumn::spark_row_interator_next = nullptr;
jmethodID SparkRowToCHColumn::spark_row_iterator_nextBatch = nullptr;
ALWAYS_INLINE static void writeRowToColumns(const std::vector<MutableColumnPtr> & columns, const SparkRowReader & spark_row_reader)
{
auto num_fields = columns.size();
for (size_t i = 0; i < num_fields; i++)
{
if (spark_row_reader.supportRawData(i))
{
const StringRef str_ref{spark_row_reader.getStringRef(i)};
if (str_ref.data == nullptr)
columns[i]->insertDefault();
else if (!spark_row_reader.isBigEndianInSparkRow(i))
columns[i]->insertData(str_ref.data, str_ref.size);
else
columns[i]->insert(spark_row_reader.getField(i)); // read decimal128
}
else
columns[i]->insert(spark_row_reader.getField(i));
}
}
std::unique_ptr<Block> SparkRowToCHColumn::convertSparkRowInfoToCHColumn(const SparkRowInfo & spark_row_info, const Block & header)
{
auto block = std::make_unique<Block>();
const auto num_rows = spark_row_info.getNumRows();
if (header.columns())
{
*block = header.cloneEmpty();
MutableColumns mutable_columns{block->mutateColumns()};
for (size_t col_i = 0; col_i < header.columns(); ++col_i)
mutable_columns[col_i]->reserve(num_rows);
DataTypes types{header.getDataTypes()};
SparkRowReader row_reader(types);
for (int64_t i = 0; i < num_rows; i++)
{
row_reader.pointTo(
spark_row_info.getBufferAddress() + spark_row_info.getOffsets()[i], static_cast<int32_t>(spark_row_info.getLengths()[i]));
writeRowToColumns(mutable_columns, row_reader);
}
block->setColumns(std::move(mutable_columns));
}
else
{
// This is a special case for count(1)/count(*)
*block = BlockUtil::buildRowCountBlock(num_rows);
}
return block;
}
void SparkRowToCHColumn::appendSparkRowToCHColumn(SparkRowToCHColumnHelper & helper, char * buffer, int32_t length)
{
SparkRowReader row_reader(helper.data_types);
row_reader.pointTo(buffer, length);
writeRowToColumns(helper.mutable_columns, row_reader);
++helper.rows;
}
Block * SparkRowToCHColumn::getBlock(SparkRowToCHColumnHelper & helper)
{
auto * block = new Block();
if (helper.header.columns())
{
*block = helper.header.cloneEmpty();
block->setColumns(std::move(helper.mutable_columns));
}
else
{
// In some cases, there is no required columns in spark plan, E.g. count(*).
// In these cases, the rows is the only needed information, so we try to create
// a block with a const column which will not be really used any where.
auto uint8_ty = std::make_shared<DB::DataTypeUInt8>();
auto col = uint8_ty->createColumnConst(helper.rows, 0);
ColumnWithTypeAndName named_col(col, uint8_ty, "__anonymous_col__");
block->insert(named_col);
}
return block;
}
VariableLengthDataReader::VariableLengthDataReader(const DataTypePtr & type_)
: type(type_), type_without_nullable(removeNullable(type)), which(type_without_nullable)
{
if (!BackingDataLengthCalculator::isVariableLengthDataType(type_without_nullable))
throw Exception(ErrorCodes::UNKNOWN_TYPE, "VariableLengthDataReader doesn't support type {}", type->getName());
}
Field VariableLengthDataReader::read(const char * buffer, size_t length) const
{
if (which.isStringOrFixedString())
return readString(buffer, length);
if (which.isDecimal128())
return readDecimal(buffer, length);
if (which.isArray())
return readArray(buffer, length);
if (which.isMap())
return readMap(buffer, length);
if (which.isTuple())
return readStruct(buffer, length);
throw Exception(ErrorCodes::UNKNOWN_TYPE, "VariableLengthDataReader doesn't support type {}", type->getName());
}
StringRef VariableLengthDataReader::readUnalignedBytes(const char * buffer, size_t length) const
{
return {buffer, length};
}
Field VariableLengthDataReader::readDecimal(const char * buffer, size_t length) const
{
assert(sizeof(Decimal128) >= length);
char decimal128_fix_data[sizeof(Decimal128)] = {};
if (Int8 (buffer[0]) < 0)
{
memset(decimal128_fix_data, int('\xff'), sizeof(Decimal128));
}
memcpy(decimal128_fix_data + sizeof(Decimal128) - length, buffer, length); // padding
String buf(decimal128_fix_data, sizeof(Decimal128));
BackingDataLengthCalculator::swapDecimalEndianBytes(buf); // Big-endian to Little-endian
auto * decimal128 = reinterpret_cast<Decimal128 *>(buf.data());
const auto * decimal128_type = typeid_cast<const DataTypeDecimal128 *>(type_without_nullable.get());
return DecimalField<Decimal128>(std::move(*decimal128), decimal128_type->getScale());
}
Field VariableLengthDataReader::readString(const char * buffer, size_t length) const
{
String str(buffer, length);
return Field(std::move(str));
}
Field VariableLengthDataReader::readArray(const char * buffer, [[maybe_unused]] size_t length) const
{
/// 内存布局:numElements(8B) | null_bitmap(与numElements成正比) | values(每个值长度与类型有关) | backing data
/// Read numElements
int64_t num_elems = 0;
memcpy(&num_elems, buffer, 8);
if (num_elems == 0 || length == 0)
return Array();
/// Skip null_bitmap
const auto len_null_bitmap = calculateBitSetWidthInBytes(num_elems);
/// Read values
const auto * array_type = typeid_cast<const DataTypeArray *>(type_without_nullable.get());
const auto & nested_type = array_type->getNestedType();
const auto elem_size = BackingDataLengthCalculator::getArrayElementSize(nested_type);
Array array;
array.reserve(num_elems);
if (BackingDataLengthCalculator::isFixedLengthDataType(removeNullable(nested_type)))
{
FixedLengthDataReader reader(nested_type);
for (int64_t i = 0; i < num_elems; ++i)
{
if (isBitSet(buffer + 8, i))
{
array.emplace_back(Null{});
}
else
{
const auto elem = reader.read(buffer + 8 + len_null_bitmap + i * elem_size);
array.emplace_back(elem);
}
}
}
else if (BackingDataLengthCalculator::isVariableLengthDataType(removeNullable(nested_type)))
{
VariableLengthDataReader reader(nested_type);
for (int64_t i = 0; i < num_elems; ++i)
{
if (isBitSet(buffer + 8, i))
{
array.emplace_back(Null{});
}
else
{
int64_t offset_and_size = 0;
memcpy(&offset_and_size, buffer + 8 + len_null_bitmap + i * 8, 8);
const int64_t offset = BackingDataLengthCalculator::extractOffset(offset_and_size);
const int64_t size = BackingDataLengthCalculator::extractSize(offset_and_size);
const auto elem = reader.read(buffer + offset, size);
array.emplace_back(elem);
}
}
}
else
throw Exception(ErrorCodes::UNKNOWN_TYPE, "VariableLengthDataReader doesn't support type {}", nested_type->getName());
return std::move(array);
}
Field VariableLengthDataReader::readMap(const char * buffer, size_t length) const
{
/// 内存布局:Length of UnsafeArrayData of key(8B) | UnsafeArrayData of key | UnsafeArrayData of value
/// Read Length of UnsafeArrayData of key
int64_t key_array_size = 0;
memcpy(&key_array_size, buffer, 8);
if (key_array_size == 0 || length == 0)
return Map();
/// Read UnsafeArrayData of keys
const auto * map_type = typeid_cast<const DataTypeMap *>(type_without_nullable.get());
const auto & key_type = map_type->getKeyType();
const auto key_array_type = std::make_shared<DataTypeArray>(key_type);
VariableLengthDataReader key_reader(key_array_type);
auto key_field = key_reader.read(buffer + 8, key_array_size);
auto & key_array = key_field.safeGet<Array>();
/// Read UnsafeArrayData of values
const auto & val_type = map_type->getValueType();
const auto val_array_type = std::make_shared<DataTypeArray>(val_type);
VariableLengthDataReader val_reader(val_array_type);
auto val_field = val_reader.read(buffer + 8 + key_array_size, length - 8 - key_array_size);
auto & val_array = val_field.safeGet<Array>();
/// Construct map in CH way [(k1, v1), (k2, v2), ...]
if (key_array.size() != val_array.size())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Key size {} not equal to value size {} in map", key_array.size(), val_array.size());
Map map(key_array.size());
for (size_t i = 0; i < key_array.size(); ++i)
{
Tuple tuple(2);
tuple[0] = std::move(key_array[i]);
tuple[1] = std::move(val_array[i]);
map[i] = std::move(tuple);
}
return std::move(map);
}
Field VariableLengthDataReader::readStruct(const char * buffer, size_t /*length*/) const
{
/// 内存布局:null_bitmap(字节数与字段数成正比) | values(num_fields * 8B) | backing data
const auto * tuple_type = typeid_cast<const DataTypeTuple *>(type_without_nullable.get());
const auto & field_types = tuple_type->getElements();
const auto num_fields = field_types.size();
if (num_fields == 0)
return Tuple();
const auto len_null_bitmap = calculateBitSetWidthInBytes(num_fields);
Tuple tuple(num_fields);
for (size_t i = 0; i < num_fields; ++i)
{
const auto & field_type = field_types[i];
if (isBitSet(buffer, i))
{
tuple[i] = Null{};
continue;
}
if (BackingDataLengthCalculator::isFixedLengthDataType(removeNullable(field_type)))
{
FixedLengthDataReader reader(field_type);
tuple[i] = reader.read(buffer + len_null_bitmap + i * 8);
}
else if (BackingDataLengthCalculator::isVariableLengthDataType(removeNullable(field_type)))
{
int64_t offset_and_size = 0;
memcpy(&offset_and_size, buffer + len_null_bitmap + i * 8, 8);
const int64_t offset = BackingDataLengthCalculator::extractOffset(offset_and_size);
const int64_t size = BackingDataLengthCalculator::extractSize(offset_and_size);
VariableLengthDataReader reader(field_type);
tuple[i] = reader.read(buffer + offset, size);
}
else
throw Exception(ErrorCodes::UNKNOWN_TYPE, "VariableLengthDataReader doesn't support type {}", field_type->getName());
}
return std::move(tuple);
}
FixedLengthDataReader::FixedLengthDataReader(const DataTypePtr & type_)
: type(type_), type_without_nullable(removeNullable(type)), which(type_without_nullable)
{
if (type->onlyNull())
{
value_size = 0;
return;
}
if (!BackingDataLengthCalculator::isFixedLengthDataType(type_without_nullable) || !type_without_nullable->isValueRepresentedByNumber())
throw Exception(ErrorCodes::UNKNOWN_TYPE, "FixedLengthDataReader doesn't support type {}", type->getName());
value_size = type_without_nullable->getSizeOfValueInMemory();
}
StringRef FixedLengthDataReader::unsafeRead(const char * buffer) const
{
return {buffer, value_size};
}
Field FixedLengthDataReader::read(const char * buffer) const
{
if (which.isUInt8())
{
UInt8 value = 0;
memcpy(&value, buffer, 1);
return value;
}
if (which.isUInt16() || which.isDate())
{
UInt16 value = 0;
memcpy(&value, buffer, 2);
return value;
}
if (which.isUInt32())
{
UInt32 value = 0;
memcpy(&value, buffer, 4);
return value;
}
if (which.isUInt64())
{
UInt64 value = 0;
memcpy(&value, buffer, 8);
return value;
}
if (which.isInt8())
{
Int8 value = 0;
memcpy(&value, buffer, 1);
return value;
}
if (which.isInt16())
{
Int16 value = 0;
memcpy(&value, buffer, 2);
return value;
}
if (which.isInt32() || which.isDate32())
{
Int32 value = 0;
memcpy(&value, buffer, 4);
return value;
}
if (which.isInt64())
{
Int64 value = 0;
memcpy(&value, buffer, 8);
return value;
}
if (which.isFloat32())
{
Float32 value = 0.0;
memcpy(&value, buffer, 4);
return value;
}
if (which.isFloat64())
{
Float64 value = 0.0;
memcpy(&value, buffer, 8);
return value;
}
if (which.isDecimal32())
{
Decimal32 value = 0;
memcpy(&value, buffer, 4);
const auto * decimal32_type = typeid_cast<const DataTypeDecimal32 *>(type_without_nullable.get());
return DecimalField{value, decimal32_type->getScale()};
}
if (which.isDecimal64() || which.isDateTime64())
{
Decimal64 value = 0;
memcpy(&value, buffer, 8);
UInt32 scale = which.isDecimal64() ? typeid_cast<const DataTypeDecimal64 *>(type_without_nullable.get())->getScale()
: typeid_cast<const DataTypeDateTime64 *>(type_without_nullable.get())->getScale();
return DecimalField{value, scale};
}
throw Exception(ErrorCodes::UNKNOWN_TYPE, "FixedLengthDataReader doesn't support type {}", type->getName());
}
}