blob: 51c77fa4df9223afca3da9f527fe0eeb1056fb44 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "arrow/json/reader.h"
#include <utility>
#include <vector>
#include "arrow/array.h"
#include "arrow/buffer.h"
#include "arrow/io/interfaces.h"
#include "arrow/json/chunked_builder.h"
#include "arrow/json/chunker.h"
#include "arrow/json/converter.h"
#include "arrow/json/parser.h"
#include "arrow/record_batch.h"
#include "arrow/table.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/iterator.h"
#include "arrow/util/logging.h"
#include "arrow/util/string_view.h"
#include "arrow/util/task_group.h"
#include "arrow/util/thread_pool.h"
namespace arrow {
using util::string_view;
using internal::checked_cast;
using internal::GetCpuThreadPool;
using internal::TaskGroup;
using internal::ThreadPool;
namespace json {
class TableReaderImpl : public TableReader,
public std::enable_shared_from_this<TableReaderImpl> {
public:
TableReaderImpl(MemoryPool* pool, const ReadOptions& read_options,
const ParseOptions& parse_options,
std::shared_ptr<TaskGroup> task_group)
: pool_(pool),
read_options_(read_options),
parse_options_(parse_options),
chunker_(MakeChunker(parse_options_)),
task_group_(std::move(task_group)) {}
Status Init(std::shared_ptr<io::InputStream> input) {
ARROW_ASSIGN_OR_RAISE(auto it,
io::MakeInputStreamIterator(input, read_options_.block_size));
return MakeReadaheadIterator(std::move(it), task_group_->parallelism())
.Value(&block_iterator_);
}
Result<std::shared_ptr<Table>> Read() override {
RETURN_NOT_OK(MakeBuilder());
ARROW_ASSIGN_OR_RAISE(auto block, block_iterator_.Next());
if (block == nullptr) {
return Status::Invalid("Empty JSON file");
}
auto self = shared_from_this();
auto empty = std::make_shared<Buffer>("");
int64_t block_index = 0;
std::shared_ptr<Buffer> partial = empty;
while (block != nullptr) {
std::shared_ptr<Buffer> next_block, whole, completion, next_partial;
ARROW_ASSIGN_OR_RAISE(next_block, block_iterator_.Next());
if (next_block == nullptr) {
// End of file reached => compute completion from penultimate block
RETURN_NOT_OK(chunker_->ProcessFinal(partial, block, &completion, &whole));
} else {
std::shared_ptr<Buffer> starts_with_whole;
// Get completion of partial from previous block.
RETURN_NOT_OK(chunker_->ProcessWithPartial(partial, block, &completion,
&starts_with_whole));
// Get all whole objects entirely inside the current buffer
RETURN_NOT_OK(chunker_->Process(starts_with_whole, &whole, &next_partial));
}
// Launch parse task
task_group_->Append([self, partial, completion, whole, block_index] {
return self->ParseAndInsert(partial, completion, whole, block_index);
});
block_index++;
partial = next_partial;
block = next_block;
}
std::shared_ptr<ChunkedArray> array;
RETURN_NOT_OK(builder_->Finish(&array));
return Table::FromChunkedStructArray(array);
}
private:
Status MakeBuilder() {
auto type = parse_options_.explicit_schema
? struct_(parse_options_.explicit_schema->fields())
: struct_({});
auto promotion_graph =
parse_options_.unexpected_field_behavior == UnexpectedFieldBehavior::InferType
? GetPromotionGraph()
: nullptr;
return MakeChunkedArrayBuilder(task_group_, pool_, promotion_graph, type, &builder_);
}
Status ParseAndInsert(const std::shared_ptr<Buffer>& partial,
const std::shared_ptr<Buffer>& completion,
const std::shared_ptr<Buffer>& whole, int64_t block_index) {
std::unique_ptr<BlockParser> parser;
RETURN_NOT_OK(BlockParser::Make(pool_, parse_options_, &parser));
RETURN_NOT_OK(parser->ReserveScalarStorage(partial->size() + completion->size() +
whole->size()));
if (partial->size() != 0 || completion->size() != 0) {
std::shared_ptr<Buffer> straddling;
if (partial->size() == 0) {
straddling = completion;
} else if (completion->size() == 0) {
straddling = partial;
} else {
ARROW_ASSIGN_OR_RAISE(straddling,
ConcatenateBuffers({partial, completion}, pool_));
}
RETURN_NOT_OK(parser->Parse(straddling));
}
if (whole->size() != 0) {
RETURN_NOT_OK(parser->Parse(whole));
}
std::shared_ptr<Array> parsed;
RETURN_NOT_OK(parser->Finish(&parsed));
builder_->Insert(block_index, field("", parsed->type()), parsed);
return Status::OK();
}
MemoryPool* pool_;
ReadOptions read_options_;
ParseOptions parse_options_;
std::unique_ptr<Chunker> chunker_;
std::shared_ptr<TaskGroup> task_group_;
Iterator<std::shared_ptr<Buffer>> block_iterator_;
std::shared_ptr<ChunkedArrayBuilder> builder_;
};
Status TableReader::Read(std::shared_ptr<Table>* out) { return Read().Value(out); }
Result<std::shared_ptr<TableReader>> TableReader::Make(
MemoryPool* pool, std::shared_ptr<io::InputStream> input,
const ReadOptions& read_options, const ParseOptions& parse_options) {
std::shared_ptr<TableReaderImpl> ptr;
if (read_options.use_threads) {
ptr = std::make_shared<TableReaderImpl>(pool, read_options, parse_options,
TaskGroup::MakeThreaded(GetCpuThreadPool()));
} else {
ptr = std::make_shared<TableReaderImpl>(pool, read_options, parse_options,
TaskGroup::MakeSerial());
}
RETURN_NOT_OK(ptr->Init(input));
return ptr;
}
Status TableReader::Make(MemoryPool* pool, std::shared_ptr<io::InputStream> input,
const ReadOptions& read_options,
const ParseOptions& parse_options,
std::shared_ptr<TableReader>* out) {
return TableReader::Make(pool, input, read_options, parse_options).Value(out);
}
Result<std::shared_ptr<RecordBatch>> ParseOne(ParseOptions options,
std::shared_ptr<Buffer> json) {
std::unique_ptr<BlockParser> parser;
RETURN_NOT_OK(BlockParser::Make(options, &parser));
RETURN_NOT_OK(parser->Parse(json));
std::shared_ptr<Array> parsed;
RETURN_NOT_OK(parser->Finish(&parsed));
auto type =
options.explicit_schema ? struct_(options.explicit_schema->fields()) : struct_({});
auto promotion_graph =
options.unexpected_field_behavior == UnexpectedFieldBehavior::InferType
? GetPromotionGraph()
: nullptr;
std::shared_ptr<ChunkedArrayBuilder> builder;
RETURN_NOT_OK(MakeChunkedArrayBuilder(TaskGroup::MakeSerial(), default_memory_pool(),
promotion_graph, type, &builder));
builder->Insert(0, field("", type), parsed);
std::shared_ptr<ChunkedArray> converted_chunked;
RETURN_NOT_OK(builder->Finish(&converted_chunked));
const auto& converted = checked_cast<const StructArray&>(*converted_chunked->chunk(0));
std::vector<std::shared_ptr<Array>> columns(converted.num_fields());
for (int i = 0; i < converted.num_fields(); ++i) {
columns[i] = converted.field(i);
}
return RecordBatch::Make(schema(converted.type()->fields()), converted.length(),
std::move(columns));
}
} // namespace json
} // namespace arrow