blob: c91f0c149abbc9c142450f255246b1a2266aad99 [file] [log] [blame]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
"""
Roaring Bitmap.
"""
from typing import Iterator, Set
import struct
class RoaringBitmap64:
"""
A 64-bit roaring bitmap implementation.
This class provides efficient storage and operations for sets of 64-bit integers.
It uses a set-based implementation for simplicity, which can be replaced with
a more efficient roaring bitmap library if needed.
"""
def __init__(self):
self._data: Set[int] = set()
def add(self, value: int) -> None:
"""Add a single value to the bitmap."""
self._data.add(value)
def add_range(self, from_: int, to: int) -> None:
"""Add a range of values [from_, to] to the bitmap."""
for i in range(from_, to + 1):
self._data.add(i)
def contains(self, value: int) -> bool:
"""Check if the bitmap contains the given value."""
return value in self._data
def is_empty(self) -> bool:
"""Check if the bitmap is empty."""
return len(self._data) == 0
def cardinality(self) -> int:
"""Return the number of elements in the bitmap."""
return len(self._data)
def __iter__(self) -> Iterator[int]:
"""Iterate over all values in the bitmap in sorted order."""
return iter(sorted(self._data))
def __len__(self) -> int:
"""Return the number of elements in the bitmap."""
return len(self._data)
def __contains__(self, value: int) -> bool:
"""Check if the bitmap contains the given value."""
return self.contains(value)
def clear(self) -> None:
"""Clear all values from the bitmap."""
self._data.clear()
def to_list(self) -> list:
"""Return a sorted list of all values in the bitmap."""
return sorted(self._data)
def to_range_list(self) -> list:
"""
Convert the bitmap to a list of Range objects.
"""
from pypaimon.globalindex.range import Range
if self.is_empty():
return []
sorted_values = sorted(self._data)
ranges = []
start = sorted_values[0]
end = start
for i in range(1, len(sorted_values)):
if sorted_values[i] == end + 1:
# Consecutive, extend the range
end = sorted_values[i]
else:
# Gap, close current range and start new one
ranges.append(Range(start, end))
start = sorted_values[i]
end = start
# Add the last range
ranges.append(Range(start, end))
return ranges
@staticmethod
def and_(a: 'RoaringBitmap64', b: 'RoaringBitmap64') -> 'RoaringBitmap64':
"""Return the intersection of two bitmaps."""
result = RoaringBitmap64()
result._data = a._data & b._data
return result
@staticmethod
def or_(a: 'RoaringBitmap64', b: 'RoaringBitmap64') -> 'RoaringBitmap64':
"""Return the union of two bitmaps."""
result = RoaringBitmap64()
result._data = a._data | b._data
return result
@staticmethod
def remove_all(a: 'RoaringBitmap64', b: 'RoaringBitmap64') -> 'RoaringBitmap64':
result = RoaringBitmap64()
result._data = a._data - b._data
return result
def serialize(self) -> bytes:
"""Serialize the bitmap to bytes."""
# Simple serialization format: count followed by sorted values
values = sorted(self._data)
data = struct.pack('>Q', len(values)) # 8-byte count
for v in values:
data += struct.pack('>q', v) # 8-byte signed value
return data
@staticmethod
def deserialize(data: bytes) -> 'RoaringBitmap64':
"""Deserialize a bitmap from bytes."""
result = RoaringBitmap64()
count = struct.unpack('>Q', data[:8])[0]
offset = 8
for _ in range(count):
value = struct.unpack('>q', data[offset:offset + 8])[0]
result.add(value)
offset += 8
return result
def __eq__(self, other: object) -> bool:
if not isinstance(other, RoaringBitmap64):
return False
return self._data == other._data
def __hash__(self) -> int:
return hash(frozenset(self._data))
def __repr__(self) -> str:
if len(self._data) <= 10:
return f"RoaringBitmap64({sorted(self._data)})"
return f"RoaringBitmap64({len(self._data)} elements)"