blob: 08551b74c1db1943bbc1698a28a217f42d8c230c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef IMPALA_EXEC_HDFS_JSON_SCANNER_H
#define IMPALA_EXEC_HDFS_JSON_SCANNER_H
#include <memory>
#include "common/status.h"
#include "exec/json/json-parser.h"
#include "exec/text/hdfs-text-scanner.h"
#include "runtime/mem-pool.h"
#include "runtime/tuple-row.h"
#include "util/runtime-profile-counters.h"
namespace impala {
class ScannerContext;
struct HdfsFileDesc;
/// HdfsScanner implementation that understands json-formatted records.
///
/// Splitting json files:
/// Similar to HdfsTextScanner, this scanner handles json files split across multiple
/// blocks/scan ranges. Note that the split can occur anywhere in the file, e.g. in the
/// middle of a row. Each scanner starts materializing tuples right after the first row
/// delimiter found in the scan range, and stops at the first row delimiter occurring past
/// the end of the scan range. If no delimiter is found in the scan range, the scanner
/// doesn't materialize anything. This scheme ensures that every row is materialized by
/// exactly one scanner.
///
/// Error handling:
/// During the process of scanning JSON, there are two types of errors may occur. The
/// first type is data conversion errors, such as attempting to convert a non-numeric
/// string to a number. These errors are detected and reported by TextConverter, and
/// handled by HdfsJsonScanner::HandleConvertError(), it will set the slot to NULL,
/// and record the error situation. If abort_on_error is true, it also returns false to
/// the event handling function of JsonParser, causing the JsonParser to interrupt parsing
/// and report the kParseErrorTermination error code. The second type of error occurs when
/// the JSON itself has formatting errors, such as missing colons or commas or including
/// invalid values. These errors are detected and reported as corresponding error codes by
/// JsonParser, and they are handled by HdfsJsonScanner::HandleError(). It will record the
/// error situation, and if abort_on_error is true, also returns an error status to
/// JsonParser, causing the query to be aborted.
class HdfsJsonScanner : public HdfsScanner {
public:
HdfsJsonScanner(HdfsScanNodeBase* scan_node, RuntimeState* state);
virtual ~HdfsJsonScanner();
/// Implementation of HdfsScanner interface.
virtual Status Open(ScannerContext* context) override WARN_UNUSED_RESULT;
virtual void Close(RowBatch* row_batch) override;
THdfsFileFormat::type file_format() const override {
return THdfsFileFormat::JSON;
}
/// Issue io manager byte ranges for 'files'.
static Status IssueInitialRanges(HdfsScanNodeBase* scan_node,
const std::vector<HdfsFileDesc*>& files) WARN_UNUSED_RESULT {
return HdfsTextScanner::IssueInitialRanges(scan_node, files);
}
/// Return true if we have builtin support for scanning text files compressed with this
/// codec.
static bool HasBuiltinSupport(THdfsCompression::type compression) {
return HdfsTextScanner::HasBuiltinSupport(compression);
}
private:
friend class JsonParser<HdfsJsonScanner>;
virtual Status InitNewRange() override WARN_UNUSED_RESULT;
virtual Status GetNextInternal(RowBatch* row_batch) override WARN_UNUSED_RESULT;
/// Find the start of the first tuple in this scan range. If successful, advances the
/// scanner state to SCANNING. Otherwise, consume the entire scan range without updating
/// the scanner state (e.g. if there is a very large JSON object).
Status FindFirstTuple() WARN_UNUSED_RESULT;
/// A wrapper around JsonParser::Parse() that checks for buffer errors before returning
/// its Status. If there are any buffer errors, they will be returned instead of the
/// Status from JsonParser::Parse(). Because GetNextBuffer() is called as a callback,
/// we need this approach to check for buffer errors.
Status ParseWrapper(int max_tuples, int* num_tuples) WARN_UNUSED_RESULT;
/// Called when adding a value to the tuple fails. Sets the target slot to null and
/// reports the error message. Returns false if necessary to abort the scan.
bool HandleConvertError(const SlotDescriptor* desc, const char* data, int len);
/// Fills bytes to buffer from the context. If 'scanner_state_' is PAST_SCANNING, the
/// scanner will read a small block data, otherwise it will just read whatever is the
/// io mgr buffer size.
Status FillBytesToBuffer(uint8_t** buffer, int64_t* bytes_read) WARN_UNUSED_RESULT;
/// Scanner state, advances as the scanning process progresses.
enum ScannerState {
CREATED,
/// Scanner is opened, ready to work.
OPENED,
/// Enter this state after finding the position of the first tuple in ScanRange,
/// indicating that the scanner is scanning data normally.
SCANNING,
/// Indicates that the scan range has been scanned, but the first row of data past the
/// end of the scan range still needs to be read for parsing.
PAST_SCANNING,
/// Scanning is finished, no more data to process.
FINISHED
} scanner_state_;
/// The returned status when fetching data from stream. Set in GetNextBuffer() and
/// checked in ParseWrapper().
Status buffer_status_;
/// TupleRow pointer of current row batch, set in GetNextInternal().
TupleRow* tuple_row_;
/// MemPool pointer of current row batch, set in GetNextInternal().
MemPool* current_pool_;
/// This is used to indicate whether an error has occurred in the currently parsed row.
bool error_in_row_;
/// A counter for the number of tuples materialized during a single ParseWrapper() call.
int num_tuples_materialized_;
/// Time JsonParser invoking, this roughly includes the time for parsing the JSON,
/// materializing the tuple, and reading the data.
RuntimeProfile::Counter* parse_json_timer_;
/// Time get next buffer in GetNextBuffer().
RuntimeProfile::Counter* get_buffer_timer_;
const static int NEXT_BLOCK_READ_SIZE = 64 * 1024; //bytes
/// JsonParser is a class template that implements parsing of JSON data stream. It is
/// supplied with a data buffer by its template parameter Scanner, and some callback
/// functions for assembling the parsing results into row format, see details in the
/// JsonParse comment.
std::unique_ptr<JsonParser<HdfsJsonScanner>> json_parser_;
/// Invoke WriteSlot (CodeGen or Interpret version) to materialize the slot, and handle
/// errors when conversion fails.
inline bool InvokeWriteSlot(const SlotDescriptor* slot_desc, const char* data, int len);
/// All functions below are callback functions provided for JsonParse, with their
/// specific uses described in the JsonParse comment.
Status HandleError(rapidjson::ParseErrorCode error, size_t offset);
bool BreakParse() { return scanner_state_ == PAST_SCANNING; }
void GetNextBuffer(const char** begin, const char** end);
void InitRow();
void SubmitRow();
void AddNull(int index);
bool AddBool(int index, bool value);
bool AddString(int index, const char* str, uint32_t len);
bool AddNumber(int index, const char* str, uint32_t len);
};
}
#endif