blob: 040009c764f350883469d618c3782eae5158a1fe [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "arrow/json/chunked_builder.h"
#include <mutex>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>
#include "arrow/array.h"
#include "arrow/buffer.h"
#include "arrow/json/converter.h"
#include "arrow/table.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/logging.h"
#include "arrow/util/task_group.h"
namespace arrow {
using internal::checked_cast;
using internal::TaskGroup;
namespace json {
class NonNestedChunkedArrayBuilder : public ChunkedArrayBuilder {
public:
NonNestedChunkedArrayBuilder(const std::shared_ptr<TaskGroup>& task_group,
std::shared_ptr<Converter> converter)
: ChunkedArrayBuilder(task_group), converter_(std::move(converter)) {}
Status Finish(std::shared_ptr<ChunkedArray>* out) override {
RETURN_NOT_OK(task_group_->Finish());
*out = std::make_shared<ChunkedArray>(std::move(chunks_), converter_->out_type());
chunks_.clear();
return Status::OK();
}
Status ReplaceTaskGroup(const std::shared_ptr<TaskGroup>& task_group) override {
RETURN_NOT_OK(task_group_->Finish());
task_group_ = task_group;
return Status::OK();
}
protected:
ArrayVector chunks_;
std::mutex mutex_;
std::shared_ptr<Converter> converter_;
};
class TypedChunkedArrayBuilder
: public NonNestedChunkedArrayBuilder,
public std::enable_shared_from_this<TypedChunkedArrayBuilder> {
public:
using NonNestedChunkedArrayBuilder::NonNestedChunkedArrayBuilder;
void Insert(int64_t block_index, const std::shared_ptr<Field>&,
const std::shared_ptr<Array>& unconverted) override {
std::unique_lock<std::mutex> lock(mutex_);
if (chunks_.size() <= static_cast<size_t>(block_index)) {
chunks_.resize(static_cast<size_t>(block_index) + 1, nullptr);
}
lock.unlock();
auto self = shared_from_this();
task_group_->Append([self, block_index, unconverted] {
std::shared_ptr<Array> converted;
RETURN_NOT_OK(self->converter_->Convert(unconverted, &converted));
std::unique_lock<std::mutex> lock(self->mutex_);
self->chunks_[block_index] = std::move(converted);
return Status::OK();
});
}
};
class InferringChunkedArrayBuilder
: public NonNestedChunkedArrayBuilder,
public std::enable_shared_from_this<InferringChunkedArrayBuilder> {
public:
InferringChunkedArrayBuilder(const std::shared_ptr<TaskGroup>& task_group,
const PromotionGraph* promotion_graph,
std::shared_ptr<Converter> converter)
: NonNestedChunkedArrayBuilder(task_group, std::move(converter)),
promotion_graph_(promotion_graph) {}
void Insert(int64_t block_index, const std::shared_ptr<Field>& unconverted_field,
const std::shared_ptr<Array>& unconverted) override {
std::unique_lock<std::mutex> lock(mutex_);
if (chunks_.size() <= static_cast<size_t>(block_index)) {
chunks_.resize(static_cast<size_t>(block_index) + 1, nullptr);
unconverted_.resize(chunks_.size(), nullptr);
unconverted_fields_.resize(chunks_.size(), nullptr);
}
unconverted_[block_index] = unconverted;
unconverted_fields_[block_index] = unconverted_field;
lock.unlock();
ScheduleConvertChunk(block_index);
}
void ScheduleConvertChunk(int64_t block_index) {
auto self = shared_from_this();
task_group_->Append([self, block_index] {
return self->TryConvertChunk(static_cast<size_t>(block_index));
});
}
Status TryConvertChunk(size_t block_index) {
std::unique_lock<std::mutex> lock(mutex_);
auto converter = converter_;
auto unconverted = unconverted_[block_index];
auto unconverted_field = unconverted_fields_[block_index];
std::shared_ptr<Array> converted;
lock.unlock();
Status st = converter->Convert(unconverted, &converted);
lock.lock();
if (converter != converter_) {
// another task promoted converter; reconvert
lock.unlock();
ScheduleConvertChunk(block_index);
return Status::OK();
}
if (st.ok()) {
// conversion succeeded
chunks_[block_index] = std::move(converted);
return Status::OK();
}
auto promoted_type =
promotion_graph_->Promote(converter_->out_type(), unconverted_field);
if (promoted_type == nullptr) {
// converter failed, no promotion available
return st;
}
RETURN_NOT_OK(MakeConverter(promoted_type, converter_->pool(), &converter_));
size_t nchunks = chunks_.size();
for (size_t i = 0; i < nchunks; ++i) {
if (i != block_index && chunks_[i]) {
// We're assuming the chunk was converted using the wrong type
// (which should be true unless the executor reorders tasks)
chunks_[i].reset();
lock.unlock();
ScheduleConvertChunk(i);
lock.lock();
}
}
lock.unlock();
ScheduleConvertChunk(block_index);
return Status::OK();
}
Status Finish(std::shared_ptr<ChunkedArray>* out) override {
RETURN_NOT_OK(NonNestedChunkedArrayBuilder::Finish(out));
unconverted_.clear();
return Status::OK();
}
private:
ArrayVector unconverted_;
std::vector<std::shared_ptr<Field>> unconverted_fields_;
const PromotionGraph* promotion_graph_;
};
class ChunkedListArrayBuilder : public ChunkedArrayBuilder {
public:
ChunkedListArrayBuilder(const std::shared_ptr<TaskGroup>& task_group, MemoryPool* pool,
std::shared_ptr<ChunkedArrayBuilder> value_builder,
const std::shared_ptr<Field>& value_field)
: ChunkedArrayBuilder(task_group),
pool_(pool),
value_builder_(std::move(value_builder)),
value_field_(value_field) {}
Status ReplaceTaskGroup(const std::shared_ptr<TaskGroup>& task_group) override {
RETURN_NOT_OK(task_group_->Finish());
RETURN_NOT_OK(value_builder_->ReplaceTaskGroup(task_group));
task_group_ = task_group;
return Status::OK();
}
void Insert(int64_t block_index, const std::shared_ptr<Field>&,
const std::shared_ptr<Array>& unconverted) override {
std::unique_lock<std::mutex> lock(mutex_);
if (unconverted->type_id() == Type::NA) {
auto st = InsertNull(block_index, unconverted->length());
if (!st.ok()) {
task_group_->Append([st] { return st; });
}
return;
}
DCHECK_EQ(unconverted->type_id(), Type::LIST);
const auto& list_array = checked_cast<const ListArray&>(*unconverted);
if (null_bitmap_chunks_.size() <= static_cast<size_t>(block_index)) {
null_bitmap_chunks_.resize(static_cast<size_t>(block_index) + 1, nullptr);
offset_chunks_.resize(null_bitmap_chunks_.size(), nullptr);
}
null_bitmap_chunks_[block_index] = unconverted->null_bitmap();
offset_chunks_[block_index] = list_array.value_offsets();
value_builder_->Insert(block_index, list_array.list_type()->value_field(),
list_array.values());
}
Status Finish(std::shared_ptr<ChunkedArray>* out) override {
RETURN_NOT_OK(task_group_->Finish());
std::shared_ptr<ChunkedArray> value_array;
RETURN_NOT_OK(value_builder_->Finish(&value_array));
auto type = list(value_field_->WithType(value_array->type())->WithMetadata(nullptr));
ArrayVector chunks(null_bitmap_chunks_.size());
for (size_t i = 0; i < null_bitmap_chunks_.size(); ++i) {
auto value_chunk = value_array->chunk(static_cast<int>(i));
auto length = offset_chunks_[i]->size() / sizeof(int32_t) - 1;
chunks[i] = std::make_shared<ListArray>(type, length, offset_chunks_[i],
value_chunk, null_bitmap_chunks_[i]);
}
*out = std::make_shared<ChunkedArray>(std::move(chunks), type);
return Status::OK();
}
private:
// call from Insert() only, with mutex_ locked
Status InsertNull(int64_t block_index, int64_t length) {
value_builder_->Insert(block_index, value_field_, std::make_shared<NullArray>(0));
ARROW_ASSIGN_OR_RAISE(null_bitmap_chunks_[block_index],
AllocateEmptyBitmap(length, pool_));
int64_t offsets_length = (length + 1) * sizeof(int32_t);
ARROW_ASSIGN_OR_RAISE(offset_chunks_[block_index],
AllocateBuffer(offsets_length, pool_));
std::memset(offset_chunks_[block_index]->mutable_data(), 0, offsets_length);
return Status::OK();
}
std::mutex mutex_;
MemoryPool* pool_;
std::shared_ptr<ChunkedArrayBuilder> value_builder_;
BufferVector offset_chunks_, null_bitmap_chunks_;
std::shared_ptr<Field> value_field_;
};
class ChunkedStructArrayBuilder : public ChunkedArrayBuilder {
public:
ChunkedStructArrayBuilder(
const std::shared_ptr<TaskGroup>& task_group, MemoryPool* pool,
const PromotionGraph* promotion_graph,
std::vector<std::pair<std::string, std::shared_ptr<ChunkedArrayBuilder>>>
name_builders)
: ChunkedArrayBuilder(task_group), pool_(pool), promotion_graph_(promotion_graph) {
for (auto&& name_builder : name_builders) {
auto index = static_cast<int>(name_to_index_.size());
name_to_index_.emplace(std::move(name_builder.first), index);
child_builders_.emplace_back(std::move(name_builder.second));
}
}
void Insert(int64_t block_index, const std::shared_ptr<Field>&,
const std::shared_ptr<Array>& unconverted) override {
std::unique_lock<std::mutex> lock(mutex_);
if (null_bitmap_chunks_.size() <= static_cast<size_t>(block_index)) {
null_bitmap_chunks_.resize(static_cast<size_t>(block_index) + 1, nullptr);
chunk_lengths_.resize(null_bitmap_chunks_.size(), -1);
child_absent_.resize(null_bitmap_chunks_.size(), std::vector<bool>(0));
}
null_bitmap_chunks_[block_index] = unconverted->null_bitmap();
chunk_lengths_[block_index] = unconverted->length();
if (unconverted->type_id() == Type::NA) {
auto maybe_buffer = AllocateBitmap(unconverted->length(), pool_);
if (maybe_buffer.ok()) {
null_bitmap_chunks_[block_index] = *std::move(maybe_buffer);
std::memset(null_bitmap_chunks_[block_index]->mutable_data(), 0,
null_bitmap_chunks_[block_index]->size());
} else {
Status st = maybe_buffer.status();
task_group_->Append([st] { return st; });
}
// absent fields will be inserted at Finish
return;
}
const auto& struct_array = checked_cast<const StructArray&>(*unconverted);
if (promotion_graph_ == nullptr) {
// If unexpected fields are ignored or result in an error then all parsers will emit
// columns exclusively in the ordering specified in ParseOptions::explicit_schema,
// so child_builders_ is immutable and no associative lookup is necessary.
for (int i = 0; i < unconverted->num_fields(); ++i) {
child_builders_[i]->Insert(block_index, unconverted->type()->field(i),
struct_array.field(i));
}
} else {
auto st = InsertChildren(block_index, struct_array);
if (!st.ok()) {
return task_group_->Append([st] { return st; });
}
}
}
Status Finish(std::shared_ptr<ChunkedArray>* out) override {
RETURN_NOT_OK(task_group_->Finish());
if (promotion_graph_ != nullptr) {
// insert absent child chunks
for (auto&& name_index : name_to_index_) {
auto child_builder = child_builders_[name_index.second].get();
RETURN_NOT_OK(child_builder->ReplaceTaskGroup(TaskGroup::MakeSerial()));
for (size_t i = 0; i < chunk_lengths_.size(); ++i) {
if (child_absent_[i].size() > static_cast<size_t>(name_index.second) &&
!child_absent_[i][name_index.second]) {
continue;
}
auto empty = std::make_shared<NullArray>(chunk_lengths_[i]);
child_builder->Insert(i, promotion_graph_->Null(name_index.first), empty);
}
}
}
std::vector<std::shared_ptr<Field>> fields(name_to_index_.size());
std::vector<std::shared_ptr<ChunkedArray>> child_arrays(name_to_index_.size());
for (auto&& name_index : name_to_index_) {
auto child_builder = child_builders_[name_index.second].get();
std::shared_ptr<ChunkedArray> child_array;
RETURN_NOT_OK(child_builder->Finish(&child_array));
child_arrays[name_index.second] = child_array;
fields[name_index.second] = field(name_index.first, child_array->type());
}
auto type = struct_(std::move(fields));
ArrayVector chunks(null_bitmap_chunks_.size());
for (size_t i = 0; i < null_bitmap_chunks_.size(); ++i) {
ArrayVector child_chunks;
for (const auto& child_array : child_arrays) {
child_chunks.push_back(child_array->chunk(static_cast<int>(i)));
}
chunks[i] = std::make_shared<StructArray>(type, chunk_lengths_[i], child_chunks,
null_bitmap_chunks_[i]);
}
*out = std::make_shared<ChunkedArray>(std::move(chunks), type);
return Status::OK();
}
Status ReplaceTaskGroup(const std::shared_ptr<TaskGroup>& task_group) override {
RETURN_NOT_OK(task_group_->Finish());
for (auto&& child_builder : child_builders_) {
RETURN_NOT_OK(child_builder->ReplaceTaskGroup(task_group));
}
task_group_ = task_group;
return Status::OK();
}
private:
// Insert children associatively by name; the unconverted block may have unexpected or
// differently ordered fields
// call from Insert() only, with mutex_ locked
Status InsertChildren(int64_t block_index, const StructArray& unconverted) {
const auto& fields = unconverted.type()->fields();
for (int i = 0; i < unconverted.num_fields(); ++i) {
auto it = name_to_index_.find(fields[i]->name());
if (it == name_to_index_.end()) {
// add a new field to this builder
auto type = promotion_graph_->Infer(fields[i]);
DCHECK_NE(type, nullptr)
<< "invalid unconverted_field encountered in conversion: "
<< fields[i]->name() << ":" << *fields[i]->type();
auto new_index = static_cast<int>(name_to_index_.size());
it = name_to_index_.emplace(fields[i]->name(), new_index).first;
std::shared_ptr<ChunkedArrayBuilder> child_builder;
RETURN_NOT_OK(MakeChunkedArrayBuilder(task_group_, pool_, promotion_graph_, type,
&child_builder));
child_builders_.emplace_back(std::move(child_builder));
}
auto unconverted_field = unconverted.type()->field(i);
child_builders_[it->second]->Insert(block_index, unconverted_field,
unconverted.field(i));
child_absent_[block_index].resize(child_builders_.size(), true);
child_absent_[block_index][it->second] = false;
}
return Status::OK();
}
std::mutex mutex_;
MemoryPool* pool_;
const PromotionGraph* promotion_graph_;
std::unordered_map<std::string, int> name_to_index_;
std::vector<std::shared_ptr<ChunkedArrayBuilder>> child_builders_;
std::vector<std::vector<bool>> child_absent_;
BufferVector null_bitmap_chunks_;
std::vector<int64_t> chunk_lengths_;
};
Status MakeChunkedArrayBuilder(const std::shared_ptr<TaskGroup>& task_group,
MemoryPool* pool, const PromotionGraph* promotion_graph,
const std::shared_ptr<DataType>& type,
std::shared_ptr<ChunkedArrayBuilder>* out) {
if (type->id() == Type::STRUCT) {
std::vector<std::pair<std::string, std::shared_ptr<ChunkedArrayBuilder>>>
child_builders;
for (const auto& f : type->fields()) {
std::shared_ptr<ChunkedArrayBuilder> child_builder;
RETURN_NOT_OK(MakeChunkedArrayBuilder(task_group, pool, promotion_graph, f->type(),
&child_builder));
child_builders.emplace_back(f->name(), std::move(child_builder));
}
*out = std::make_shared<ChunkedStructArrayBuilder>(task_group, pool, promotion_graph,
std::move(child_builders));
return Status::OK();
}
if (type->id() == Type::LIST) {
const auto& list_type = checked_cast<const ListType&>(*type);
std::shared_ptr<ChunkedArrayBuilder> value_builder;
RETURN_NOT_OK(MakeChunkedArrayBuilder(task_group, pool, promotion_graph,
list_type.value_type(), &value_builder));
*out = std::make_shared<ChunkedListArrayBuilder>(
task_group, pool, std::move(value_builder), list_type.value_field());
return Status::OK();
}
std::shared_ptr<Converter> converter;
RETURN_NOT_OK(MakeConverter(type, pool, &converter));
if (promotion_graph) {
*out = std::make_shared<InferringChunkedArrayBuilder>(task_group, promotion_graph,
std::move(converter));
} else {
*out = std::make_shared<TypedChunkedArrayBuilder>(task_group, std::move(converter));
}
return Status::OK();
}
} // namespace json
} // namespace arrow