blob: f88adcfa6afa8edbb05623d15572680a5c12a1aa [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef IMPALA_EXEC_HDFS_AVRO_SCANNER_H
#define IMPALA_EXEC_HDFS_AVRO_SCANNER_H
/// This scanner reads Avro object container files (i.e., Avro data files) and writes the
/// content as tuples in the Impala in-memory representation of data (tuples, rows,
/// row batches).
///
/// The specification for Avro files can be found at
/// http://avro.apache.org/docs/current/spec.html (the current Avro version is
/// 1.7.4 as of the time of this writing). At a high level, an Avro data file has
/// the following structure:
///
/// - Avro data file
/// - file header
/// - file version header
/// - file metadata
/// - JSON schema
/// - compression codec (optional)
/// - sync marker
/// - data block+
/// - # of Avro objects in block
/// - size of objects in block (post-compression)
/// - serialized objects
/// - sync marker
///
///
/// This implementation reads one data block at a time, using the schema from the file
/// header to decode the serialized objects. If possible, non-materialized columns are
/// skipped without being read. If codegen is enabled, we codegen a function based on the
/// table schema that parses records, materializes them to tuples, and evaluates the
/// conjuncts.
///
/// The Avro C library is used to parse the file's schema and the table's schema, which are
/// then resolved according to the Avro spec and transformed into our own schema
/// representation (i.e. a list of SchemaElements). Schema resolution allows users to
/// evolve the table schema and file schema(s) independently. The spec goes over all the
/// rules for schema resolution, but in summary:
///
/// - Record fields are matched by name (and thus can be reordered; the table schema
/// determines the order of the columns)
/// - Fields in the file schema not present in the table schema are ignored
/// - Fields in the table schema not present in the file schema must have a default value
/// specified
/// - Types can be "promoted" as follows:
/// int -> long -> float -> double
///
/// TODO:
/// - implement SkipComplex()
/// - codegen a function per unique file schema, rather than just the table schema
/// - once Exprs are thread-safe, we can cache the jitted function directly
/// - microbenchmark codegen'd functions (this and other scanners)
#include "exec/base-sequence-scanner.h"
#include <avro/basics.h>
#include "exec/read-write-util.h"
#include "runtime/tuple.h"
#include "runtime/tuple-row.h"
namespace llvm {
class BasicBlock;
class Value;
}
namespace impala {
class HdfsAvroScanner : public BaseSequenceScanner {
public:
/// The four byte Avro version header present at the beginning of every
/// Avro file: {'O', 'b', 'j', 1}
static const uint8_t AVRO_VERSION_HEADER[4];
HdfsAvroScanner(HdfsScanNodeBase* scan_node, RuntimeState* state);
virtual Status Open(ScannerContext* context) WARN_UNUSED_RESULT;
/// Codegen DecodeAvroData(). Stores the resulting function in 'decode_avro_data_fn' if
/// codegen was successful or nullptr otherwise.
static Status Codegen(HdfsScanNodeBase* node,
const std::vector<ScalarExpr*>& conjuncts,
llvm::Function** decode_avro_data_fn)
WARN_UNUSED_RESULT;
static const char* LLVM_CLASS_NAME;
protected:
/// Implementation of BaseSeqeunceScanner super class methods
virtual FileHeader* AllocateFileHeader();
/// TODO: check that file schema matches metadata schema
virtual Status ReadFileHeader() WARN_UNUSED_RESULT;
virtual Status InitNewRange() WARN_UNUSED_RESULT;
virtual Status ProcessRange(RowBatch* row_batch) WARN_UNUSED_RESULT;
virtual THdfsFileFormat::type file_format() const { return THdfsFileFormat::AVRO; }
private:
friend class HdfsAvroScannerTest;
struct AvroFileHeader : public BaseSequenceScanner::FileHeader {
/// The root of the file schema tree (i.e. the top-level record schema of the file)
ScopedAvroSchemaElement schema;
/// Template tuple for this file containing partition key values and default values.
/// Set to nullptr if there are no materialized partition keys and no default values
/// are necessary (i.e., all materialized fields are present in the file schema).
/// This tuple is created by the scanner processing the initial scan range with
/// the header. The ownership of memory is transferred to the scan-node pool,
/// such that it remains live when subsequent scanners process data ranges.
Tuple* template_tuple;
/// True if this file can use the codegen'd version of DecodeAvroData() (i.e. its
/// schema matches the table schema), false otherwise.
bool use_codegend_decode_avro_data;
};
AvroFileHeader* avro_header_ = nullptr;
/// Current data block after decompression with its end and length.
uint8_t* data_block_ = nullptr;
uint8_t* data_block_end_ = nullptr;
int64_t data_block_len_ = 0;
/// Number of records in the current data block and the current record position.
int64_t num_records_in_block_ = 0;
int64_t record_pos_ = 0;
/// Metadata keys
static const std::string AVRO_SCHEMA_KEY;
static const std::string AVRO_CODEC_KEY;
/// Supported codecs, as they appear in the metadata
static const std::string AVRO_NULL_CODEC;
static const std::string AVRO_SNAPPY_CODEC;
static const std::string AVRO_DEFLATE_CODEC;
typedef int (*DecodeAvroDataFn)(HdfsAvroScanner*, int, MemPool*, uint8_t**, uint8_t*,
Tuple*, TupleRow*);
/// The codegen'd version of DecodeAvroData() if available, nullptr otherwise.
DecodeAvroDataFn codegend_decode_avro_data_ = nullptr;
/// Utility function for decoding and parsing file header metadata
Status ParseMetadata() WARN_UNUSED_RESULT;
/// Resolves the table schema (i.e. the reader schema) against the file schema (i.e. the
/// writer schema), and sets the 'slot_desc' fields of the nodes of the file schema
/// corresponding to materialized slots. Calls WriteDefaultValue() as
/// appropriate. Returns a non-OK status if the schemas could not be resolved.
Status ResolveSchemas(const AvroSchemaElement& table_root,
AvroSchemaElement* file_root) WARN_UNUSED_RESULT;
// Returns Status::OK iff table_schema (the reader schema) can be resolved against
// file_schema (the writer schema). field_name is used for error messages.
Status VerifyTypesMatch(const AvroSchemaElement& table_schema,
const AvroSchemaElement& file_schema, const string& field_name) WARN_UNUSED_RESULT;
/// Returns Status::OK iff a value with the given schema can be used to populate
/// 'slot_desc', as if 'schema' were the writer schema and 'slot_desc' the reader
/// schema. 'schema' can be either a avro_schema_t or avro_datum_t.
Status VerifyTypesMatch(SlotDescriptor* slot_desc, avro_obj_t* schema)
WARN_UNUSED_RESULT;
/// Return true if reader_type can be used to read writer_type according to the Avro
/// type promotion rules. Note that this does not handle nullability or TYPE_NULL.
bool VerifyTypesMatch(const ColumnType& reader_type, const ColumnType& writer_type);
/// Writes 'default_value' to 'slot_desc' in the template tuple, initializing the
/// template tuple if it doesn't already exist. Returns a non-OK status if slot_desc's
/// and default_value's types are incompatible or unsupported. field_name is used for
/// error messages.
Status WriteDefaultValue(SlotDescriptor* slot_desc, avro_datum_t default_value,
const char* field_name) WARN_UNUSED_RESULT;
/// Decodes records and copies the data into tuples.
/// Returns the number of tuples to be committed.
/// - max_tuples: the maximum number of tuples to write
/// - data: serialized record data. Is advanced as records are read.
/// - data_end: pointer to the end of the data buffer (i.e. the first invalid byte).
/// - pool: memory pool to allocate string data from
/// - tuple: tuple pointer to copy objects to
/// - tuple_row: tuple row of written tuples
int DecodeAvroData(int max_tuples, MemPool* pool, uint8_t** data, uint8_t* data_end,
Tuple* tuple, TupleRow* tuple_row);
/// Materializes a single tuple from serialized record data. Will return false and set
/// error in parse_status_ if memory limit is exceeded when allocating new char buffer.
/// See comments below for ReadAvroChar().
bool MaterializeTuple(const AvroSchemaElement& record_schema, MemPool* pool,
uint8_t** data, uint8_t* data_end, Tuple* tuple);
/// Produces a version of DecodeAvroData that uses codegen'd instead of interpreted
/// functions. Stores the resulting function in 'decode_avro_data_fn' if codegen was
/// successful or returns an error.
static Status CodegenDecodeAvroData(const HdfsScanNodeBase* node, LlvmCodeGen* codegen,
const std::vector<ScalarExpr*>& conjuncts,
llvm::Function** decode_avro_data_fn) WARN_UNUSED_RESULT;
/// Codegens a version of MaterializeTuple() that reads records based on the table
/// schema. Stores the resulting function in 'materialize_tuple_fn' if codegen was
/// successful or returns an error.
/// TODO: Codegen a function for each unique file schema.
static Status CodegenMaterializeTuple(const HdfsScanNodeBase* node,
LlvmCodeGen* codegen, llvm::Function** materialize_tuple_fn) WARN_UNUSED_RESULT;
/// Used by CodegenMaterializeTuple to recursively create the IR for reading an Avro
/// record.
/// - path: the column path constructed so far. This is used to find the slot desc, if
/// any, associated with each field of the record. Note that this assumes the
/// table's Avro schema matches up with the table's column definitions by ordinal.
/// - builder: used to insert the IR, starting at the current insert point. The insert
/// point will be left at the end of the record but before the 'insert_before'
/// block.
/// - insert_before: the block to insert any new blocks directly before. This is either
/// the bail_out block or some basic blocks before that.
/// - bail_out: the block to jump to if anything fails. This is used in particular by
/// ReadAvroChar() which can exceed memory limit during allocation from MemPool.
/// - this_val, pool_val, tuple_val, data_val, data_end_val: arguments to
/// MaterializeTuple()
/// - child_start / child_end: specifies to only generate a subset of the record
/// schema's children
static Status CodegenReadRecord(const SchemaPath& path, const AvroSchemaElement& record,
int child_start, int child_end, const HdfsScanNodeBase* node, LlvmCodeGen* codegen,
void* builder, llvm::Function* fn, llvm::BasicBlock* insert_before,
llvm::BasicBlock* bail_out, llvm::Value* this_val, llvm::Value* pool_val,
llvm::Value* tuple_val, llvm::Value* data_val,
llvm::Value* data_end_val) WARN_UNUSED_RESULT;
/// Creates the IR for reading an Avro scalar at builder's current insert point.
static Status CodegenReadScalar(const AvroSchemaElement& element,
SlotDescriptor* slot_desc, LlvmCodeGen* codegen, void* void_builder,
llvm::Value* this_val, llvm::Value* pool_val, llvm::Value* tuple_val,
llvm::Value* data_val, llvm::Value* data_end_val, llvm::Value** ret_val)
WARN_UNUSED_RESULT;
/// The following are cross-compiled functions for parsing a serialized Avro primitive
/// type and writing it to a slot. They can also be used for skipping a field without
/// writing it to a slot by setting 'write_slot' to false.
/// - data: Serialized record data. Is advanced past the read field.
/// - data_end: pointer to the end of the data buffer (i.e. the first invalid byte).
/// The following arguments are used only if 'write_slot' is true:
/// - slot: The tuple slot to write the parsed field into.
/// - type: The type of the slot. (This is necessary because there is not a 1:1 mapping
/// between Avro types and Impala's primitive types.)
/// - pool: MemPool for string data.
///
/// All return false and set parse_status_ on error (e.g. mem limit exceeded when
/// allocating buffer, malformed data), and return true otherwise.
///
bool ReadAvroBoolean(PrimitiveType type, uint8_t** data, uint8_t* data_end,
bool write_slot, void* slot, MemPool* pool);
bool ReadAvroDate(PrimitiveType type, uint8_t** data, uint8_t* data_end,
bool write_slot, void* slot, MemPool* pool);
bool ReadAvroInt32(PrimitiveType type, uint8_t** data, uint8_t* data_end,
bool write_slot, void* slot, MemPool* pool);
bool ReadAvroInt64(PrimitiveType type, uint8_t** data, uint8_t* data_end,
bool write_slot, void* slot, MemPool* pool);
bool ReadAvroFloat(PrimitiveType type, uint8_t** data, uint8_t* data_end,
bool write_slot, void* slot, MemPool* pool);
bool ReadAvroDouble(PrimitiveType type, uint8_t** data, uint8_t* data_end,
bool write_slot, void* slot, MemPool* pool);
bool ReadAvroVarchar(PrimitiveType type, int max_len, uint8_t** data, uint8_t* data_end,
bool write_slot, void* slot, MemPool* pool);
bool ReadAvroChar(PrimitiveType type, int max_len, uint8_t** data, uint8_t* data_end,
bool write_slot, void* slot, MemPool* pool);
bool ReadAvroString(PrimitiveType type, uint8_t** data, uint8_t* data_end,
bool write_slot, void* slot, MemPool* pool);
/// Helper function for some of the above. Returns the the length of certain varlen
/// types and updates 'data'. If an error is encountered returns a non-ok result and
/// updates parse_status_.
ReadWriteUtil::ZLongResult ReadFieldLen(uint8_t** data, uint8_t* data_end);
/// Same as the above functions, except takes the size of the decimal slot (i.e. 4, 8, or
/// 16) instead of the type (which should be TYPE_DECIMAL). The slot size is passed
/// explicitly, rather than passing a ColumnType, so we can easily pass in a constant in
/// the codegen'd MaterializeTuple() function. If 'write_slot' is false, 'slot_byte_size'
/// is ignored.
bool ReadAvroDecimal(
int slot_byte_size, uint8_t** data, uint8_t* data_end, bool write_slot, void* slot,
MemPool* pool);
/// Reads and advances 'data' past the union branch index and sets 'is_null' according
/// to if the corresponding element is null. 'null_union_position' must be 0 or
/// 1. Returns false and sets parse_status_ if there's an error, otherwise returns true.
bool ReadUnionType(int null_union_position, uint8_t** data, uint8_t* data_end,
bool* is_null);
/// Helper functions to set parse_status_ outside of xcompiled functions. This is to
/// avoid including string construction, etc. in the IR, which boths bloats it and can
/// contain exception handling code.
void SetStatusCorruptData(TErrorCode::type error_code);
void SetStatusInvalidValue(TErrorCode::type error_code, int64_t len);
void SetStatusValueOverflow(TErrorCode::type error_code, int64_t len, int64_t limit);
/// Unit test constructor
HdfsAvroScanner();
};
} // namespace impala
#endif // IMPALA_EXEC_HDFS_AVRO_SCANNER_H