blob: 568246bb63ba6b128751a2c376d93c0495155878 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "arrow/json/chunker.h"
#include <algorithm>
#include <utility>
#include <vector>
#include "arrow/json/rapidjson_defs.h"
#include "rapidjson/reader.h"
#include "arrow/buffer.h"
#include "arrow/json/options.h"
#include "arrow/util/logging.h"
#include "arrow/util/make_unique.h"
#include "arrow/util/string_view.h"
namespace arrow {
using internal::make_unique;
using util::string_view;
namespace json {
namespace rj = arrow::rapidjson;
static size_t ConsumeWhitespace(string_view view) {
#ifdef RAPIDJSON_SIMD
auto data = view.data();
auto nonws_begin = rj::SkipWhitespace_SIMD(data, data + view.size());
return nonws_begin - data;
#else
auto ws_count = view.find_first_not_of(" \t\r\n");
if (ws_count == string_view::npos) {
return view.size();
} else {
return ws_count;
}
#endif
}
/// RapidJson custom stream for reading JSON stored in multiple buffers
/// http://rapidjson.org/md_doc_stream.html#CustomStream
class MultiStringStream {
public:
using Ch = char;
explicit MultiStringStream(std::vector<string_view> strings)
: strings_(std::move(strings)) {
std::reverse(strings_.begin(), strings_.end());
}
explicit MultiStringStream(const BufferVector& buffers) : strings_(buffers.size()) {
for (size_t i = 0; i < buffers.size(); ++i) {
strings_[i] = string_view(*buffers[i]);
}
std::reverse(strings_.begin(), strings_.end());
}
char Peek() const {
if (strings_.size() == 0) return '\0';
return strings_.back()[0];
}
char Take() {
if (strings_.size() == 0) return '\0';
char taken = strings_.back()[0];
if (strings_.back().size() == 1) {
strings_.pop_back();
} else {
strings_.back() = strings_.back().substr(1);
}
++index_;
return taken;
}
size_t Tell() { return index_; }
void Put(char) { ARROW_LOG(FATAL) << "not implemented"; }
void Flush() { ARROW_LOG(FATAL) << "not implemented"; }
char* PutBegin() {
ARROW_LOG(FATAL) << "not implemented";
return nullptr;
}
size_t PutEnd(char*) {
ARROW_LOG(FATAL) << "not implemented";
return 0;
}
private:
size_t index_ = 0;
std::vector<string_view> strings_;
};
template <typename Stream>
static size_t ConsumeWholeObject(Stream&& stream) {
static constexpr unsigned parse_flags = rj::kParseIterativeFlag |
rj::kParseStopWhenDoneFlag |
rj::kParseNumbersAsStringsFlag;
rj::BaseReaderHandler<rj::UTF8<>> handler;
rj::Reader reader;
// parse a single JSON object
switch (reader.Parse<parse_flags>(stream, handler).Code()) {
case rj::kParseErrorNone:
return stream.Tell();
case rj::kParseErrorDocumentEmpty:
return 0;
default:
// rapidjson emitted an error, the most recent object was partial
return string_view::npos;
}
}
namespace {
// A BoundaryFinder implementation that assumes JSON objects can contain raw newlines,
// and uses actual JSON parsing to delimit them.
class ParsingBoundaryFinder : public BoundaryFinder {
public:
Status FindFirst(string_view partial, string_view block, int64_t* out_pos) override {
// NOTE: We could bubble up JSON parse errors here, but the actual parsing
// step will detect them later anyway.
auto length = ConsumeWholeObject(MultiStringStream({partial, block}));
if (length == string_view::npos) {
*out_pos = -1;
} else {
DCHECK_GE(length, partial.size());
DCHECK_LE(length, partial.size() + block.size());
*out_pos = static_cast<int64_t>(length - partial.size());
}
return Status::OK();
}
Status FindLast(util::string_view block, int64_t* out_pos) override {
const size_t block_length = block.size();
size_t consumed_length = 0;
while (consumed_length < block_length) {
rj::MemoryStream ms(reinterpret_cast<const char*>(block.data()), block.size());
using InputStream = rj::EncodedInputStream<rj::UTF8<>, rj::MemoryStream>;
auto length = ConsumeWholeObject(InputStream(ms));
if (length == string_view::npos || length == 0) {
// found incomplete object or block is empty
break;
}
consumed_length += length;
block = block.substr(length);
}
if (consumed_length == 0) {
*out_pos = -1;
} else {
consumed_length += ConsumeWhitespace(block);
DCHECK_LE(consumed_length, block_length);
*out_pos = static_cast<int64_t>(consumed_length);
}
return Status::OK();
}
};
} // namespace
std::unique_ptr<Chunker> MakeChunker(const ParseOptions& options) {
std::shared_ptr<BoundaryFinder> delimiter;
if (options.newlines_in_values) {
delimiter = std::make_shared<ParsingBoundaryFinder>();
} else {
delimiter = MakeNewlineBoundaryFinder();
}
return std::unique_ptr<Chunker>(new Chunker(std::move(delimiter)));
}
} // namespace json
} // namespace arrow