| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| #include "NativeWriter.h" |
| #include <IO/WriteBuffer.h> |
| #include <IO/WriteHelpers.h> |
| #include <DataTypes/Serializations/ISerialization.h> |
| #include <Columns/ColumnSparse.h> |
| #include <Columns/ColumnString.h> |
| #include <Storages/IO/AggregateSerializationUtils.h> |
| #include <Functions/FunctionHelpers.h> |
| #include <DataTypes/DataTypeAggregateFunction.h> |
| #include <DataTypes/DataTypeLowCardinality.h> |
| |
| |
| using namespace DB; |
| |
| namespace local_engine |
| { |
| |
| const String NativeWriter::AGG_STATE_SUFFIX= "#optagg"; |
| |
| void NativeWriter::flush() |
| { |
| ostr.next(); |
| } |
| |
| static void writeData(const ISerialization & serialization, const ColumnPtr & column, WriteBuffer & ostr, UInt64 offset, UInt64 limit) |
| { |
| /** If there are columns-constants - then we materialize them. |
| * (Since the data type does not know how to serialize / deserialize constants.) |
| */ |
| ColumnPtr full_column = column->convertToFullColumnIfConst(); |
| |
| ISerialization::SerializeBinaryBulkSettings settings; |
| settings.getter = [&ostr](ISerialization::SubstreamPath) -> WriteBuffer * { return &ostr; }; |
| settings.position_independent_encoding = false; |
| settings.low_cardinality_max_dictionary_size = 0; |
| |
| ISerialization::SerializeBinaryBulkStatePtr state; |
| serialization.serializeBinaryBulkStatePrefix(*full_column, settings, state); |
| serialization.serializeBinaryBulkWithMultipleStreams(*full_column, offset, limit, settings, state); |
| serialization.serializeBinaryBulkStateSuffix(settings, state); |
| } |
| |
| size_t NativeWriter::write(const DB::Block & block) |
| { |
| size_t written_before = ostr.count(); |
| |
| block.checkNumberOfRows(); |
| |
| /// Dimensions |
| size_t columns = block.columns(); |
| size_t rows = block.rows(); |
| |
| writeVarUInt(columns, ostr); |
| writeVarUInt(rows, ostr); |
| |
| for (size_t i = 0; i < columns; ++i) |
| { |
| auto column = block.safeGetByPosition(i); |
| /// agg state will convert to fixedString, need write actual agg state type |
| auto original_type = header.safeGetByPosition(i).type; |
| original_type = recursiveRemoveLowCardinality(original_type); |
| /// Type |
| String type_name = original_type->getName(); |
| bool is_agg_opt = WhichDataType(original_type).isAggregateFunction() |
| && header.safeGetByPosition(i).column->getDataType() != block.safeGetByPosition(i).column->getDataType(); |
| if (is_agg_opt) |
| { |
| writeStringBinary(type_name + AGG_STATE_SUFFIX, ostr); |
| } |
| else |
| { |
| writeStringBinary(type_name, ostr); |
| } |
| |
| column.column = recursiveRemoveSparse(column.column); |
| column.column = recursiveRemoveLowCardinality(column.column); |
| SerializationPtr serialization = recursiveRemoveLowCardinality(column.type)->getDefaultSerialization(); |
| /// Data |
| if (rows) /// Zero items of data is always represented as zero number of bytes. |
| { |
| const auto * agg_type = checkAndGetDataType<DataTypeAggregateFunction>(original_type.get()); |
| if (is_agg_opt && agg_type && !isFixedSizeAggregateFunction(agg_type->getFunction())) |
| { |
| const auto * str_col = static_cast<const ColumnString *>(column.column.get()); |
| const PaddedPODArray<UInt8> & column_chars = str_col->getChars(); |
| ostr.write(column_chars.raw_data(), str_col->getOffsets().back()); |
| } |
| else |
| { |
| writeData(*serialization, column.column, ostr, 0, 0); |
| } |
| } |
| } |
| |
| size_t written_after = ostr.count(); |
| size_t written_size = written_after - written_before; |
| return written_size; |
| } |
| } |