blob: 8d37bca0546b9d7b308b9b23d7b11b4237ef75a0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "DeltaDVRoaringBitmapArray.h"
#include <zlib.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Storages/SubstraitSource/ReadBufferBuilder.h>
#include <substrait/plan.pb.h>
#include <roaring.hh>
#include <Poco/URI.h>
#include <Common/PODArray.h>
namespace DB
{
namespace ErrorCodes
{
extern const int TOO_LARGE_ARRAY_SIZE;
extern const int INCORRECT_DATA;
extern const int BAD_ARGUMENTS;
}
}
namespace local_engine
{
using namespace DB;
std::pair<UInt32, UInt32> DeltaDVRoaringBitmapArray::decompose_high_low_bytes(UInt64 value)
{
return {static_cast<UInt32>(value >> 32), static_cast<UInt32>(value & 0xFFFFFFFF)};
}
UInt64 DeltaDVRoaringBitmapArray::compose_from_high_low_bytes(UInt32 high, UInt32 low)
{
return (static_cast<uint64_t>(high) << 32) | low;
}
DeltaDVRoaringBitmapArray::DeltaDVRoaringBitmapArray()
{
}
void DeltaDVRoaringBitmapArray::rb_read(const String & file_path, Int32 offset, Int32 data_size, DB::ContextPtr context)
{
// maybe encode in scala
std::string decoded;
Poco::URI::decode(file_path, decoded);
std::string encoded;
Poco::URI::encode(decoded, "", encoded);
substrait::ReadRel::LocalFiles::FileOrFiles file_info;
file_info.set_uri_file(encoded);
file_info.set_start(offset);
file_info.set_length(data_size);
const Poco::URI file_uri(encoded);
ReadBufferBuilderPtr read_buffer_builder = ReadBufferBuilderFactory::instance().createBuilder(file_uri.getScheme(), context);
auto * in = dynamic_cast<DB::SeekableReadBuffer *>(read_buffer_builder->build(file_info).release());
if (in == nullptr)
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Failed to create a valid SeekableReadBuffer.");
in->seek(offset, SEEK_SET);
int size;
readBinaryBigEndian(size, *in);
if (data_size != size)
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "The size of the deletion vector is mismatch.");
int checksum_value = static_cast<Int32>(crc32_z(0L, reinterpret_cast<unsigned char *>(in->position()), size));
deserialize(*in);
int expected_checksum;
readBinaryBigEndian(expected_checksum, *in);
if (expected_checksum != checksum_value)
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Checksum mismatch.");
}
UInt64 DeltaDVRoaringBitmapArray::cardinality() const
{
UInt64 sum = 0;
for (const auto & r : roaring_bitmap_array)
sum += r.cardinality();
return sum;
}
bool DeltaDVRoaringBitmapArray::rb_contains(Int64 x) const
{
auto [high, low] = decompose_high_low_bytes(x);
if (high >= roaring_bitmap_array.size())
return false;
return roaring_bitmap_array[high].contains(low);
}
void DeltaDVRoaringBitmapArray::rb_clear()
{
std::vector<roaring::Roaring>().swap(roaring_bitmap_array);
}
bool DeltaDVRoaringBitmapArray::rb_is_empty() const
{
return std::ranges::all_of(roaring_bitmap_array.begin(), roaring_bitmap_array.end(), [](const auto & rb) { return rb.isEmpty(); });
}
void DeltaDVRoaringBitmapArray::rb_add(Int64 x)
{
const UInt64 value = static_cast<UInt64>(x);
assert(value >= 0 && value <= MAX_REPRESENTABLE_VALUE);
auto [high, low] = decompose_high_low_bytes(value);
if (high >= roaring_bitmap_array.size())
rb_extend_bitmaps(high + 1);
roaring_bitmap_array[high].add(low);
}
void DeltaDVRoaringBitmapArray::rb_extend_bitmaps(Int32 new_length)
{
if (roaring_bitmap_array.size() >= new_length)
return;
roaring_bitmap_array.resize(new_length);
}
void DeltaDVRoaringBitmapArray::rb_shrink_bitmaps(Int32 new_length)
{
if (roaring_bitmap_array.size() <= new_length)
return;
roaring_bitmap_array.resize(new_length);
}
void DeltaDVRoaringBitmapArray::rb_merge(const DeltaDVRoaringBitmapArray & that)
{
rb_or(that);
}
void DeltaDVRoaringBitmapArray::merge(const String & that)
{
DB::ReadBufferFromString rb(that);
DeltaDVRoaringBitmapArray that_bitmap;
that_bitmap.deserialize(rb);
rb_merge(that_bitmap);
}
void DeltaDVRoaringBitmapArray::rb_or(const DeltaDVRoaringBitmapArray & that)
{
if (roaring_bitmap_array.size() < that.roaring_bitmap_array.size())
rb_extend_bitmaps(that.roaring_bitmap_array.size());
const Int32 count = that.roaring_bitmap_array.size();
for (Int32 i = 0; i < count; ++i)
roaring_bitmap_array[i] |= that.roaring_bitmap_array[i];
}
bool DeltaDVRoaringBitmapArray::operator==(const DeltaDVRoaringBitmapArray & other) const
{
if (this == &other)
return true;
return roaring_bitmap_array == other.roaring_bitmap_array;
}
String DeltaDVRoaringBitmapArray::serialize() const
{
DB::WriteBufferFromOwnString out;
constexpr Int32 magic_number = 1681511377;
writeBinaryLittleEndian(magic_number, out);
Int64 size = roaring_bitmap_array.size();
writeBinaryLittleEndian(size, out);
for (Int32 i = 0; i < roaring_bitmap_array.size(); ++i)
{
writeBinaryLittleEndian(i, out);
std::unique_ptr<roaring::Roaring> bitmap = std::make_unique<roaring::Roaring>(roaring_bitmap_array.at(i));
bitmap->runOptimize();
auto size_in_bytes = bitmap->getSizeInBytes();
std::unique_ptr<char[]> buf(new char[size_in_bytes]);
bitmap->write(buf.get());
out.write(buf.get(), size_in_bytes);
}
return out.str();
}
void DeltaDVRoaringBitmapArray::deserialize(DB::ReadBuffer & buf)
{
Int32 magic_num;
readBinaryLittleEndian(magic_num, buf);
if (magic_num != 1681511377)
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "The magic num is mismatch.");
int64_t bitmap_array_size;
readBinaryLittleEndian(bitmap_array_size, buf);
roaring_bitmap_array.reserve(bitmap_array_size);
for (size_t i = 0; i < bitmap_array_size; ++i)
{
int bitmap_index;
readBinaryLittleEndian(bitmap_index, buf);
roaring::Roaring r = roaring::Roaring::read(buf.position());
size_t current_bitmap_size = r.getSizeInBytes();
buf.ignore(current_bitmap_size);
roaring_bitmap_array.push_back(r);
}
}
std::optional<Int64> DeltaDVRoaringBitmapArray::last()
{
if (roaring_bitmap_array.empty() || roaring_bitmap_array.back().isEmpty())
return std::nullopt;
return compose_from_high_low_bytes(roaring_bitmap_array.size(), roaring_bitmap_array.back().maximum());
}
}