blob: 2e8799842ba703b11c0b724900c7155d53e258b0 [file] [log] [blame]
/*
* Copyright 2024-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "paimon/common/reader/reader_utils.h"
#include <cassert>
#include <cstdint>
#include <memory>
#include <utility>
#include "arrow/api.h"
#include "arrow/array/array_base.h"
#include "arrow/c/abi.h"
#include "arrow/c/bridge.h"
#include "arrow/c/helpers.h"
#include "paimon/common/utils/arrow/status_utils.h"
#include "paimon/status.h"
#include "paimon/utils/roaring_bitmap32.h"
namespace paimon {
Result<arrow::ArrayVector> ReaderUtils::GenerateFilteredArrayVector(
const std::shared_ptr<arrow::Array>& src_array, const RoaringBitmap32& bitmap) {
if (bitmap.Cardinality() == 0) {
return Status::Invalid("selection bitmap cannot be empty in GenerateFilteredArrayVector");
}
arrow::ArrayVector array_vec;
auto valid_iter = bitmap.Begin();
int32_t pos = 0;
while (valid_iter != bitmap.End() && pos < src_array->length()) {
int64_t valid_start_pos = *valid_iter;
for (pos = *valid_iter; pos < src_array->length() && valid_iter != bitmap.End();
pos++, ++valid_iter) {
if (pos != *valid_iter) {
break;
}
}
int64_t valid_end_pos = pos;
array_vec.push_back(src_array->Slice(valid_start_pos, valid_end_pos - valid_start_pos));
}
assert(!array_vec.empty());
return array_vec;
}
void ReaderUtils::ReleaseReadBatch(BatchReader::ReadBatch&& batch) {
auto& [c_array, c_schema] = batch;
if (c_array) {
ArrowArrayRelease(c_array.get());
}
if (c_schema) {
ArrowSchemaRelease(c_schema.get());
}
}
Result<BatchReader::ReadBatch> ReaderUtils::ApplyBitmapToReadBatch(
BatchReader::ReadBatchWithBitmap&& batch_with_bitmap, arrow::MemoryPool* arrow_pool) {
if (BatchReader::IsEofBatch(batch_with_bitmap)) {
return std::move(batch_with_bitmap.first);
}
BatchReader::ReadBatchWithBitmap moved_batch_with_bitmap = std::move(batch_with_bitmap);
auto& [batch, bitmap] = moved_batch_with_bitmap;
auto& [c_array, c_schema] = batch;
assert(c_array);
if (bitmap.IsEmpty()) {
ReleaseReadBatch(std::move(batch));
return Status::Invalid(
"NextBatchWithBitmap should always return the result with at least one valid row "
"except eof");
}
if (bitmap.Cardinality() == c_array->length) {
// indicates all rows in batch are valid
return std::move(batch);
}
PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Array> arrow_array,
arrow::ImportArray(c_array.get(), c_schema.get()));
PAIMON_ASSIGN_OR_RAISE(arrow::ArrayVector array_vec,
GenerateFilteredArrayVector(arrow_array, bitmap));
PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Array> result,
arrow::Concatenate(array_vec, arrow_pool));
assert(result && result->length() > 0);
std::unique_ptr<ArrowArray> result_c_array = std::make_unique<ArrowArray>();
std::unique_ptr<ArrowSchema> result_c_schema = std::make_unique<ArrowSchema>();
PAIMON_RETURN_NOT_OK_FROM_ARROW(
arrow::ExportArray(*result, result_c_array.get(), result_c_schema.get()));
return make_pair(std::move(result_c_array), std::move(result_c_schema));
}
BatchReader::ReadBatchWithBitmap ReaderUtils::AddAllValidBitmap(BatchReader::ReadBatch&& batch) {
if (BatchReader::IsEofBatch(batch)) {
return BatchReader::MakeEofBatchWithBitmap();
}
RoaringBitmap32 all_valid;
all_valid.AddRange(0, batch.first->length);
return std::make_pair(std::move(batch), std::move(all_valid));
}
} // namespace paimon