blob: c19005cd13ddd36f46c998869d8e22a3bff7ddef [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "NativeWriter.h"
#include <IO/WriteBuffer.h>
#include <IO/WriteHelpers.h>
#include <DataTypes/Serializations/ISerialization.h>
#include <Columns/ColumnSparse.h>
#include <Columns/ColumnString.h>
#include <Storages/IO/AggregateSerializationUtils.h>
#include <Functions/FunctionHelpers.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <DataTypes/DataTypeLowCardinality.h>
using namespace DB;
namespace local_engine
{
const String NativeWriter::AGG_STATE_SUFFIX= "#optagg";
void NativeWriter::flush()
{
ostr.next();
}
static void writeData(const ISerialization & serialization, const ColumnPtr & column, WriteBuffer & ostr, UInt64 offset, UInt64 limit)
{
/** If there are columns-constants - then we materialize them.
* (Since the data type does not know how to serialize / deserialize constants.)
*/
ColumnPtr full_column = column->convertToFullColumnIfConst();
ISerialization::SerializeBinaryBulkSettings settings;
settings.getter = [&ostr](ISerialization::SubstreamPath) -> WriteBuffer * { return &ostr; };
settings.position_independent_encoding = false;
settings.low_cardinality_max_dictionary_size = 0;
ISerialization::SerializeBinaryBulkStatePtr state;
serialization.serializeBinaryBulkStatePrefix(*full_column, settings, state);
serialization.serializeBinaryBulkWithMultipleStreams(*full_column, offset, limit, settings, state);
serialization.serializeBinaryBulkStateSuffix(settings, state);
}
size_t NativeWriter::write(const DB::Block & block)
{
size_t written_before = ostr.count();
block.checkNumberOfRows();
/// Dimensions
size_t columns = block.columns();
size_t rows = block.rows();
writeVarUInt(columns, ostr);
writeVarUInt(rows, ostr);
for (size_t i = 0; i < columns; ++i)
{
auto column = block.safeGetByPosition(i);
/// agg state will convert to fixedString, need write actual agg state type
auto original_type = header.safeGetByPosition(i).type;
original_type = recursiveRemoveLowCardinality(original_type);
/// Type
String type_name = original_type->getName();
bool is_agg_opt = WhichDataType(original_type).isAggregateFunction()
&& header.safeGetByPosition(i).column->getDataType() != block.safeGetByPosition(i).column->getDataType();
if (is_agg_opt)
{
writeStringBinary(type_name + AGG_STATE_SUFFIX, ostr);
}
else
{
writeStringBinary(type_name, ostr);
}
column.column = recursiveRemoveSparse(column.column);
column.column = recursiveRemoveLowCardinality(column.column);
SerializationPtr serialization = recursiveRemoveLowCardinality(column.type)->getDefaultSerialization();
/// Data
if (rows) /// Zero items of data is always represented as zero number of bytes.
{
const auto * agg_type = checkAndGetDataType<DataTypeAggregateFunction>(original_type.get());
if (is_agg_opt && agg_type && !isFixedSizeAggregateFunction(agg_type->getFunction()))
{
const auto * str_col = static_cast<const ColumnString *>(column.column.get());
const PaddedPODArray<UInt8> & column_chars = str_col->getChars();
ostr.write(column_chars.raw_data(), str_col->getOffsets().back());
}
else
{
writeData(*serialization, column.column, ostr, 0, 0);
}
}
}
size_t written_after = ostr.count();
size_t written_size = written_after - written_before;
return written_size;
}
}