blob: 90023130e99c7b7469066dfec8e26db76d3b67f5 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/json/hdfs-json-scanner.h"
#include "common/names.h"
#include "common/status.h"
#include "exec/hdfs-scan-node.h"
#include "exec/scanner-context.inline.h"
#include "exec/text-converter.h"
#include "exec/text-converter.inline.h"
#include "runtime/multi-precision.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
#include "runtime/tuple-row.h"
#include "util/decompress.h"
#include "util/debug-util.h"
#include "util/scope-exit-trigger.h"
#include "util/string-parser.h"
#include <gutil/strings/substitute.h>
#include <map>
using namespace impala;
using namespace impala::io;
using namespace strings;
DEFINE_bool(enable_json_scanner, true,
"If set false, disable reading from json format tables.");
HdfsJsonScanner::HdfsJsonScanner(HdfsScanNodeBase* scan_node, RuntimeState* state)
: HdfsScanner(scan_node, state),
scanner_state_(CREATED),
buffer_status_(Status::OK()),
tuple_row_(nullptr),
current_pool_(nullptr),
error_in_row_(false),
num_tuples_materialized_(0),
parse_json_timer_(nullptr),
get_buffer_timer_(nullptr) { }
HdfsJsonScanner::~HdfsJsonScanner() { }
Status HdfsJsonScanner::Open(ScannerContext* context) {
DCHECK_EQ(scanner_state_, CREATED);
RETURN_IF_ERROR(HdfsScanner::Open(context));
parse_json_timer_ = ADD_TIMER(scan_node_->runtime_profile(), "ParseJsonTime");
get_buffer_timer_ = ADD_TIMER(scan_node_->runtime_profile(), "GetDataBufferTime");
RETURN_IF_ERROR(InitNewRange());
scanner_state_ = OPENED;
return Status::OK();
}
void HdfsJsonScanner::Close(RowBatch* row_batch) {
DCHECK(!is_closed_);
// Need to close the decompressor before transferring the remaining resources to
// 'row_batch' because in some cases there is memory allocated in the decompressor_'s
// temp_memory_pool_.
if (decompressor_ != nullptr) {
decompressor_->Close();
decompressor_.reset();
}
if (row_batch != nullptr) {
row_batch->tuple_data_pool()->AcquireData(template_tuple_pool_.get(), false);
if (scan_node_->HasRowBatchQueue()) {
static_cast<HdfsScanNode*>(scan_node_)->AddMaterializedRowBatch(
std::unique_ptr<RowBatch>(row_batch));
}
} else {
template_tuple_pool_->FreeAll();
}
// The JsonParser always copies values instead of referencing them, so it doesn't
// reference the data in the data_buffer_pool_. Therefore, we don't need
// row_batch to acquire data from the data_buffer_pool_, so we could always
// call FreeAll().
data_buffer_pool_->FreeAll();
context_->ReleaseCompletedResources(true);
// Verify all resources (if any) have been transferred or freed.
DCHECK_EQ(template_tuple_pool_.get()->total_allocated_bytes(), 0);
DCHECK_EQ(data_buffer_pool_.get()->total_allocated_bytes(), 0);
scan_node_->RangeComplete(THdfsFileFormat::JSON,
stream_->file_desc()->file_compression);
CloseInternal();
}
Status HdfsJsonScanner::InitNewRange() {
DCHECK_EQ(scanner_state_, CREATED);
auto compression_type = stream_ ->file_desc()->file_compression;
// Update the decompressor based on the compression type of the file in the context.
DCHECK(compression_type != THdfsCompression::SNAPPY)
<< "FE should have generated SNAPPY_BLOCKED instead.";
// In Hadoop, text files compressed into .DEFLATE files contain
// deflate with zlib wrappings as opposed to raw deflate, which
// is what THdfsCompression::DEFLATE implies. Since deflate is
// the default compression algorithm used in Hadoop, it makes
// sense to map it to type DEFAULT in Impala instead
if (compression_type == THdfsCompression::DEFLATE) {
compression_type = THdfsCompression::DEFAULT;
}
RETURN_IF_ERROR(UpdateDecompressor(compression_type));
// TODO: Optmize for zero slots scan (e.g. count(*)).
vector<string> schema;
schema.reserve(scan_node_->materialized_slots().size());
for (const SlotDescriptor* slot : scan_node_->materialized_slots()) {
schema.push_back(scan_node_->hdfs_table()->GetColumnDesc(slot).name());
}
text_converter_.reset(new TextConverter('\\', "", false, state_->strict_mode()));
json_parser_.reset(new JsonParser<HdfsJsonScanner>(schema, this));
json_parser_->ResetParser();
return Status::OK();
}
Status HdfsJsonScanner::GetNextInternal(RowBatch* row_batch) {
DCHECK(!eos_);
DCHECK_GE(scanner_state_, OPENED);
DCHECK_NE(scanner_state_, FINISHED);
current_pool_ = row_batch->tuple_data_pool();
if (scanner_state_ == OPENED) {
// Find the first tuple. If scanner_state_ is not SCANNING, it means we went through
// the entire scan range without finding a single tuple. The bytes will be picked up
// by the previous scan range in the same file.
RETURN_IF_ERROR(FindFirstTuple());
if (scanner_state_ != SCANNING) {
eos_ = true;
scanner_state_ = FINISHED;
return Status::OK();
}
}
int64_t tuple_buffer_size;
RETURN_IF_ERROR(
row_batch->ResizeAndAllocateTupleBuffer(state_, &tuple_buffer_size, &tuple_mem_));
tuple_ = reinterpret_cast<Tuple*>(tuple_mem_);
tuple_row_ = row_batch->GetRow(row_batch->AddRow());
while (scanner_state_ == SCANNING) {
num_tuples_materialized_ = 0;
int num_tuples = 0;
int max_tuples = row_batch->capacity() - row_batch->num_rows();
RETURN_IF_ERROR(ParseWrapper(max_tuples, &num_tuples));
COUNTER_ADD(scan_node_->rows_read_counter(), num_tuples);
// Because the processes of parsing JSON, materializing tuples, and even reading data
// are intertwined, it can be expensive to accurately time them individually.
// Therefore, we using this method to measure the time it takes to materialize tuples,
// please note that the value obtained will always be inflated because the time it
// takes to parse JSON is also included.
// TODO: find a better way.
COUNTER_SET(scan_node_->materialize_tuple_timer(),
parse_json_timer_->value() - get_buffer_timer_->value());
RETURN_IF_ERROR(CommitRows(num_tuples_materialized_, row_batch));
if (row_batch->AtCapacity() || scan_node_->ReachedLimitShared()) break;
}
if (scanner_state_ >= PAST_SCANNING || scan_node_->ReachedLimitShared()) {
eos_ = true;
scanner_state_ = FINISHED;
}
return Status::OK();
}
Status HdfsJsonScanner::FindFirstTuple() {
DCHECK_EQ(scanner_state_, OPENED);
if (stream_->scan_range()->offset() == 0) {
scanner_state_ = SCANNING;
return Status::OK();
}
SCOPED_TIMER(parse_json_timer_);
if (json_parser_->MoveToNextJson()) {
scanner_state_ = SCANNING;
DCHECK_OK(buffer_status_);
}
return buffer_status_;
}
Status HdfsJsonScanner::ParseWrapper(int max_tuples, int* num_tuples) {
DCHECK(json_parser_->IsTidy());
SCOPED_TIMER(parse_json_timer_);
Status status = json_parser_->Parse(max_tuples, num_tuples);
RETURN_IF_ERROR(buffer_status_);
return status;
}
bool HdfsJsonScanner::HandleConvertError(const SlotDescriptor* desc, const char* data,
int len) {
error_in_row_ = true;
tuple_->SetNull(desc->null_indicator_offset());
if (state_->LogHasSpace() || state_->abort_on_error()) {
const HdfsTableDescriptor* table = scan_node_->hdfs_table();
constexpr int max_view_len = 50;
string data_view = string(data, std::min(len, max_view_len));
if (len > max_view_len) data_view += "...";
string msg = Substitute("Error converting column: $0.$1.$2, type: $3, data: '$4'",
table->database(), table->name(), table->GetColumnDesc(desc).name(),
desc->type().DebugString(), data_view);
if (state_->LogHasSpace()) {
state_->LogError(ErrorMsg(TErrorCode::GENERAL, msg), 2);
}
if (state_->abort_on_error() && parse_status_.ok()) parse_status_ = Status(msg);
}
return parse_status_.ok();
}
Status HdfsJsonScanner::HandleError(rapidjson::ParseErrorCode error, size_t offset) {
if (error == rapidjson::kParseErrorTermination) {
DCHECK(!parse_status_.ok());
RETURN_IF_ERROR(state_->LogOrReturnError(parse_status_.msg()));
parse_status_ = Status::OK();
} else {
RETURN_IF_ERROR(state_->LogOrReturnError(ErrorMsg::Init(TErrorCode::GENERAL,
Substitute("Parse Json file error, file: $0, offset: $1, cause by: $2",
stream_->filename(), stream_->scan_range()->offset() + offset,
rapidjson::GetParseError_En(error)))));
}
return Status::OK();
}
static bool AllWhitespaceBeforeNewline(uint8_t* begin, int64_t len) {
DCHECK(len >= 0);
uint8_t* end = begin + len;
while (begin != end) {
switch (*begin++) {
case '\r':
case '\t':
case ' ': break;
case '\n': return true;
default: return false;
}
}
return false;
}
Status HdfsJsonScanner::FillBytesToBuffer(uint8_t** buffer, int64_t* bytes_read) {
if (scanner_state_ == PAST_SCANNING) {
// In the PAST_SCANNING state, we only read a small block data at a time for scanning.
// If the parser completes the parsing of the last json object, it will exit the loop
// due to BreakParse().
Status status;
if (UNLIKELY(!stream_->GetBytes(NEXT_BLOCK_READ_SIZE, buffer, bytes_read, &status))) {
DCHECK(!status.ok());
return status;
}
// A special case is when the first character of the next scan range is a newline
// character (perhaps with other whitespace characters before it). Our scan should
// stop at the first newline character in the next range, while the parser skips
// whitespace characters. If we don't handle this case, the first line of the next
// range will be scanned twice. Therefore, we need to return directly here to inform
// the parser that eos has been reached.
if (UNLIKELY(AllWhitespaceBeforeNewline(*buffer, *bytes_read))) {
scanner_state_ = FINISHED;
*bytes_read = 0;
}
} else {
RETURN_IF_ERROR(stream_->GetBuffer(false, buffer, bytes_read));
}
return Status::OK();
}
void HdfsJsonScanner::GetNextBuffer(const char** begin, const char** end) {
DCHECK(*begin == *end);
SCOPED_TIMER(get_buffer_timer_);
// The eosr indicates that we have scanned all data within the scan range. If the
// scanner state is OPENED, it means that we encountered eosr in FindFirstTuple(),
// indicating that there is no start of tuple in this scan range, this will be handle by
// the previous scan range in the same file. If the scanner state is SCANNING, it means
// that we have completed scanning the data within the range, and need to read the next
// range of data to complete the scan.
if (stream_->eosr()) {
if (scanner_state_ == OPENED) return;
if (scanner_state_ == SCANNING) scanner_state_ = PAST_SCANNING;
}
if (stream_->eof() || scanner_state_ == FINISHED) return;
uint8_t* next_buffer_begin = nullptr;
int64_t next_buffer_size = 0;
if (decompressor_ == nullptr) {
buffer_status_ = FillBytesToBuffer(&next_buffer_begin, &next_buffer_size);
} else if (decompressor_->supports_streaming()) {
bool eosr = false;
// The JsonParser always copies values instead of referencing them, so it doesn't
// reference the data in the data_buffer_pool_. Therefore, we don't need current_pool_
// to acquire data from the data_buffer_pool_, so we pass nullptr to
// DecompressStreamToBuffer().
buffer_status_ = DecompressStreamToBuffer(&next_buffer_begin, &next_buffer_size,
nullptr, &eosr);
} else {
buffer_status_ = DecompressFileToBuffer(&next_buffer_begin, &next_buffer_size);
if (next_buffer_size == 0) scanner_state_ = FINISHED;
}
RETURN_VOID_IF_ERROR(buffer_status_);
if (UNLIKELY(next_buffer_size == 0)) return;
*begin = reinterpret_cast<char*>(next_buffer_begin);
*end = *begin + next_buffer_size;
}
void HdfsJsonScanner::InitRow() {
InitTuple(template_tuple_, tuple_);
}
void HdfsJsonScanner::SubmitRow() {
tuple_row_->SetTuple(0, tuple_);
if (EvalConjuncts(tuple_row_)) {
++num_tuples_materialized_;
tuple_ = next_tuple(tuple_byte_size() ,tuple_);
tuple_row_ = next_row(tuple_row_);
}
if (UNLIKELY(error_in_row_)) {
LogRowParseError();
error_in_row_ = false;
}
}
bool HdfsJsonScanner::InvokeWriteSlot(const SlotDescriptor* slot_desc, const char* data,
int len) {
// TODO: Support Invoke CodeGend WriteSlot
if (LIKELY(text_converter_->WriteSlot(slot_desc, tuple_, data, len, true, false,
current_pool_))) {
return true;
}
return HandleConvertError(slot_desc, data, len);
}
void HdfsJsonScanner::AddNull(int index) {
const SlotDescriptor* slot_desc = scan_node_->materialized_slots()[index];
tuple_->SetNull(slot_desc->null_indicator_offset());
}
bool HdfsJsonScanner::AddBool(int index, bool value) {
const SlotDescriptor* slot_desc = scan_node_->materialized_slots()[index];
if (UNLIKELY(slot_desc->type().type != TYPE_BOOLEAN)) {
return InvokeWriteSlot(slot_desc, value ? "true" : "false", value ? 4 : 5);
}
void* slot = tuple_->GetSlot(slot_desc->tuple_offset());
*reinterpret_cast<bool*>(slot) = value;
return true;
}
bool HdfsJsonScanner::AddString(int index, const char* str, uint32_t len) {
const SlotDescriptor* slot_desc = scan_node_->materialized_slots()[index];
return InvokeWriteSlot(slot_desc, str, len);
}
bool HdfsJsonScanner::AddNumber(int index, const char* str, uint32_t len) {
const SlotDescriptor* slot_desc = scan_node_->materialized_slots()[index];
return InvokeWriteSlot(slot_desc, str, len);
}