blob: 582dece6668ddce91da5127cee1b7ad35c300c78 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <city.h>
#include <cstring>
#include <base/types.h>
#include <base/unaligned.h>
#include <base/defines.h>
#include <IO/WriteHelpers.h>
#include <Compression/CompressionFactory.h>
#include <Common/Stopwatch.h>
#include "CompressedWriteBuffer.h"
using namespace DB;
namespace local_engine
{
void CompressedWriteBuffer::nextImpl()
{
if (!offset())
return;
chassert(offset() <= INT_MAX);
UInt32 decompressed_size = static_cast<UInt32>(offset());
UInt32 compressed_reserve_size = codec->getCompressedReserveSize(decompressed_size);
/** During compression we need buffer with capacity >= compressed_reserve_size + CHECKSUM_SIZE.
*
* If output buffer has necessary capacity, we can compress data directly into the output buffer.
* Then we can write checksum at the output buffer begin.
*
* If output buffer does not have necessary capacity. Compress data into a temporary buffer.
* Then we can write checksum and copy the temporary buffer into the output buffer.
*/
Stopwatch compress_time_watch;
if (out.available() >= compressed_reserve_size + sizeof(CityHash_v1_0_2::uint128))
{
char * out_compressed_ptr = out.position() + sizeof(CityHash_v1_0_2::uint128);
UInt32 compressed_size = codec->compress(working_buffer.begin(), decompressed_size, out_compressed_ptr);
compress_time += compress_time_watch.elapsedNanoseconds();
CityHash_v1_0_2::uint128 checksum_(0,0);
if (checksum)
{
checksum_ = CityHash_v1_0_2::CityHash128(out_compressed_ptr, compressed_size);
}
writeBinaryLittleEndian(checksum_.low64, out);
writeBinaryLittleEndian(checksum_.high64, out);
out.position() += compressed_size;
}
else
{
compressed_buffer.resize_exact(compressed_reserve_size);
UInt32 compressed_size = codec->compress(working_buffer.begin(), decompressed_size, compressed_buffer.data());
compress_time += compress_time_watch.elapsedNanoseconds();
CityHash_v1_0_2::uint128 checksum_(0,0);
if (checksum)
{
checksum_ = CityHash_v1_0_2::CityHash128(compressed_buffer.data(), compressed_size);
}
writeBinaryLittleEndian(checksum_.low64, out);
writeBinaryLittleEndian(checksum_.high64, out);
Stopwatch write_time_watch;
out.write(compressed_buffer.data(), compressed_size);
write_time += write_time_watch.elapsedNanoseconds();
}
}
CompressedWriteBuffer::~CompressedWriteBuffer()
{
finalize();
}
CompressedWriteBuffer::CompressedWriteBuffer(WriteBuffer & out_, CompressionCodecPtr codec_, size_t buf_size, bool checksum_)
: BufferWithOwnMemory<WriteBuffer>(buf_size), out(out_), codec(std::move(codec_)), checksum(checksum_)
{
}
}