blob: 36e5546a749abdd922e2da6e8d5d9f40c9d079d5 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "arrow/array/builder_adaptive.h"
#include <algorithm>
#include <cstdint>
#include "arrow/array/data.h"
#include "arrow/buffer.h"
#include "arrow/buffer_builder.h"
#include "arrow/result.h"
#include "arrow/status.h"
#include "arrow/type.h"
#include "arrow/util/int_util.h"
#include "arrow/util/logging.h"
namespace arrow {
using internal::AdaptiveIntBuilderBase;
AdaptiveIntBuilderBase::AdaptiveIntBuilderBase(uint8_t start_int_size, MemoryPool* pool)
: ArrayBuilder(pool), start_int_size_(start_int_size), int_size_(start_int_size) {}
void AdaptiveIntBuilderBase::Reset() {
ArrayBuilder::Reset();
data_.reset();
raw_data_ = nullptr;
pending_pos_ = 0;
pending_has_nulls_ = false;
int_size_ = start_int_size_;
}
Status AdaptiveIntBuilderBase::Resize(int64_t capacity) {
RETURN_NOT_OK(CheckCapacity(capacity));
capacity = std::max(capacity, kMinBuilderCapacity);
int64_t nbytes = capacity * int_size_;
if (capacity_ == 0) {
ARROW_ASSIGN_OR_RAISE(data_, AllocateResizableBuffer(nbytes, pool_));
} else {
RETURN_NOT_OK(data_->Resize(nbytes));
}
raw_data_ = reinterpret_cast<uint8_t*>(data_->mutable_data());
return ArrayBuilder::Resize(capacity);
}
template <typename new_type, typename old_type>
typename std::enable_if<sizeof(old_type) >= sizeof(new_type), Status>::type
AdaptiveIntBuilderBase::ExpandIntSizeInternal() {
return Status::OK();
}
template <typename new_type, typename old_type>
typename std::enable_if<(sizeof(old_type) < sizeof(new_type)), Status>::type
AdaptiveIntBuilderBase::ExpandIntSizeInternal() {
int_size_ = sizeof(new_type);
RETURN_NOT_OK(Resize(data_->size() / sizeof(old_type)));
const old_type* src = reinterpret_cast<old_type*>(raw_data_);
new_type* dst = reinterpret_cast<new_type*>(raw_data_);
// By doing the backward copy, we ensure that no element is overridden during
// the copy process while the copy stays in-place.
std::copy_backward(src, src + length_, dst + length_);
return Status::OK();
}
std::shared_ptr<DataType> AdaptiveUIntBuilder::type() const {
auto int_size = int_size_;
if (pending_pos_ != 0) {
const uint8_t* valid_bytes = pending_has_nulls_ ? pending_valid_ : nullptr;
int_size =
internal::DetectUIntWidth(pending_data_, valid_bytes, pending_pos_, int_size_);
}
switch (int_size) {
case 1:
return uint8();
case 2:
return uint16();
case 4:
return uint32();
case 8:
return uint64();
default:
DCHECK(false);
}
return nullptr;
}
std::shared_ptr<DataType> AdaptiveIntBuilder::type() const {
auto int_size = int_size_;
if (pending_pos_ != 0) {
const uint8_t* valid_bytes = pending_has_nulls_ ? pending_valid_ : nullptr;
int_size = internal::DetectIntWidth(reinterpret_cast<const int64_t*>(pending_data_),
valid_bytes, pending_pos_, int_size_);
}
switch (int_size) {
case 1:
return int8();
case 2:
return int16();
case 4:
return int32();
case 8:
return int64();
default:
DCHECK(false);
}
return nullptr;
}
AdaptiveIntBuilder::AdaptiveIntBuilder(uint8_t start_int_size, MemoryPool* pool)
: AdaptiveIntBuilderBase(start_int_size, pool) {}
Status AdaptiveIntBuilder::FinishInternal(std::shared_ptr<ArrayData>* out) {
RETURN_NOT_OK(CommitPendingData());
std::shared_ptr<Buffer> null_bitmap;
RETURN_NOT_OK(null_bitmap_builder_.Finish(&null_bitmap));
RETURN_NOT_OK(TrimBuffer(length_ * int_size_, data_.get()));
*out = ArrayData::Make(type(), length_, {null_bitmap, data_}, null_count_);
data_ = nullptr;
capacity_ = length_ = null_count_ = 0;
return Status::OK();
}
Status AdaptiveIntBuilder::CommitPendingData() {
if (pending_pos_ == 0) {
return Status::OK();
}
RETURN_NOT_OK(Reserve(pending_pos_));
const uint8_t* valid_bytes = pending_has_nulls_ ? pending_valid_ : nullptr;
RETURN_NOT_OK(AppendValuesInternal(reinterpret_cast<const int64_t*>(pending_data_),
pending_pos_, valid_bytes));
pending_has_nulls_ = false;
pending_pos_ = 0;
return Status::OK();
}
static constexpr int64_t kAdaptiveIntChunkSize = 8192;
Status AdaptiveIntBuilder::AppendValuesInternal(const int64_t* values, int64_t length,
const uint8_t* valid_bytes) {
if (pending_pos_ > 0) {
// UnsafeAppendToBitmap expects length_ to be the pre-update value, satisfy it
DCHECK_EQ(length, pending_pos_) << "AppendValuesInternal called while data pending";
length_ -= pending_pos_;
}
while (length > 0) {
// In case `length` is very large, we don't want to trash the cache by
// scanning it twice (first to detect int width, second to copy the data).
// Instead, process data in L2-cacheable chunks.
const int64_t chunk_size = std::min(length, kAdaptiveIntChunkSize);
uint8_t new_int_size;
new_int_size = internal::DetectIntWidth(values, valid_bytes, chunk_size, int_size_);
DCHECK_GE(new_int_size, int_size_);
if (new_int_size > int_size_) {
// This updates int_size_
RETURN_NOT_OK(ExpandIntSize(new_int_size));
}
switch (int_size_) {
case 1:
internal::DowncastInts(values, reinterpret_cast<int8_t*>(raw_data_) + length_,
chunk_size);
break;
case 2:
internal::DowncastInts(values, reinterpret_cast<int16_t*>(raw_data_) + length_,
chunk_size);
break;
case 4:
internal::DowncastInts(values, reinterpret_cast<int32_t*>(raw_data_) + length_,
chunk_size);
break;
case 8:
internal::DowncastInts(values, reinterpret_cast<int64_t*>(raw_data_) + length_,
chunk_size);
break;
default:
DCHECK(false);
}
// UnsafeAppendToBitmap increments length_ by chunk_size
ArrayBuilder::UnsafeAppendToBitmap(valid_bytes, chunk_size);
values += chunk_size;
if (valid_bytes != nullptr) {
valid_bytes += chunk_size;
}
length -= chunk_size;
}
return Status::OK();
}
Status AdaptiveUIntBuilder::CommitPendingData() {
if (pending_pos_ == 0) {
return Status::OK();
}
RETURN_NOT_OK(Reserve(pending_pos_));
const uint8_t* valid_bytes = pending_has_nulls_ ? pending_valid_ : nullptr;
RETURN_NOT_OK(AppendValuesInternal(pending_data_, pending_pos_, valid_bytes));
pending_has_nulls_ = false;
pending_pos_ = 0;
return Status::OK();
}
Status AdaptiveIntBuilder::AppendValues(const int64_t* values, int64_t length,
const uint8_t* valid_bytes) {
RETURN_NOT_OK(CommitPendingData());
RETURN_NOT_OK(Reserve(length));
return AppendValuesInternal(values, length, valid_bytes);
}
template <typename new_type>
Status AdaptiveIntBuilder::ExpandIntSizeN() {
switch (int_size_) {
case 1:
return ExpandIntSizeInternal<new_type, int8_t>();
case 2:
return ExpandIntSizeInternal<new_type, int16_t>();
case 4:
return ExpandIntSizeInternal<new_type, int32_t>();
case 8:
return ExpandIntSizeInternal<new_type, int64_t>();
default:
DCHECK(false);
}
return Status::OK();
}
Status AdaptiveIntBuilder::ExpandIntSize(uint8_t new_int_size) {
switch (new_int_size) {
case 1:
return ExpandIntSizeN<int8_t>();
case 2:
return ExpandIntSizeN<int16_t>();
case 4:
return ExpandIntSizeN<int32_t>();
case 8:
return ExpandIntSizeN<int64_t>();
default:
DCHECK(false);
}
return Status::OK();
}
AdaptiveUIntBuilder::AdaptiveUIntBuilder(uint8_t start_int_size, MemoryPool* pool)
: AdaptiveIntBuilderBase(start_int_size, pool) {}
Status AdaptiveUIntBuilder::FinishInternal(std::shared_ptr<ArrayData>* out) {
RETURN_NOT_OK(CommitPendingData());
std::shared_ptr<Buffer> null_bitmap;
RETURN_NOT_OK(null_bitmap_builder_.Finish(&null_bitmap));
RETURN_NOT_OK(TrimBuffer(length_ * int_size_, data_.get()));
*out = ArrayData::Make(type(), length_, {null_bitmap, data_}, null_count_);
data_ = nullptr;
capacity_ = length_ = null_count_ = 0;
return Status::OK();
}
Status AdaptiveUIntBuilder::AppendValuesInternal(const uint64_t* values, int64_t length,
const uint8_t* valid_bytes) {
if (pending_pos_ > 0) {
// UnsafeAppendToBitmap expects length_ to be the pre-update value, satisfy it
DCHECK_EQ(length, pending_pos_) << "AppendValuesInternal called while data pending";
length_ -= pending_pos_;
}
while (length > 0) {
// See AdaptiveIntBuilder::AppendValuesInternal
const int64_t chunk_size = std::min(length, kAdaptiveIntChunkSize);
uint8_t new_int_size;
new_int_size = internal::DetectUIntWidth(values, valid_bytes, chunk_size, int_size_);
DCHECK_GE(new_int_size, int_size_);
if (new_int_size > int_size_) {
// This updates int_size_
RETURN_NOT_OK(ExpandIntSize(new_int_size));
}
switch (int_size_) {
case 1:
internal::DowncastUInts(values, reinterpret_cast<uint8_t*>(raw_data_) + length_,
chunk_size);
break;
case 2:
internal::DowncastUInts(values, reinterpret_cast<uint16_t*>(raw_data_) + length_,
chunk_size);
break;
case 4:
internal::DowncastUInts(values, reinterpret_cast<uint32_t*>(raw_data_) + length_,
chunk_size);
break;
case 8:
internal::DowncastUInts(values, reinterpret_cast<uint64_t*>(raw_data_) + length_,
chunk_size);
break;
default:
DCHECK(false);
}
// UnsafeAppendToBitmap increments length_ by chunk_size
ArrayBuilder::UnsafeAppendToBitmap(valid_bytes, chunk_size);
values += chunk_size;
if (valid_bytes != nullptr) {
valid_bytes += chunk_size;
}
length -= chunk_size;
}
return Status::OK();
}
Status AdaptiveUIntBuilder::AppendValues(const uint64_t* values, int64_t length,
const uint8_t* valid_bytes) {
RETURN_NOT_OK(Reserve(length));
return AppendValuesInternal(values, length, valid_bytes);
}
template <typename new_type>
Status AdaptiveUIntBuilder::ExpandIntSizeN() {
switch (int_size_) {
case 1:
return ExpandIntSizeInternal<new_type, uint8_t>();
case 2:
return ExpandIntSizeInternal<new_type, uint16_t>();
case 4:
return ExpandIntSizeInternal<new_type, uint32_t>();
case 8:
return ExpandIntSizeInternal<new_type, uint64_t>();
default:
DCHECK(false);
}
return Status::OK();
}
Status AdaptiveUIntBuilder::ExpandIntSize(uint8_t new_int_size) {
switch (new_int_size) {
case 1:
return ExpandIntSizeN<uint8_t>();
case 2:
return ExpandIntSizeN<uint16_t>();
case 4:
return ExpandIntSizeN<uint32_t>();
case 8:
return ExpandIntSizeN<uint64_t>();
default:
DCHECK(false);
}
return Status::OK();
}
} // namespace arrow