blob: 6c737a4c6893dab27c35a7e17edbfcc9dcc466de [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/hdfs-avro-scanner.h"
#include <avro/errors.h>
#include <avro/legacy.h>
#include <gutil/strings/substitute.h>
#include "codegen/llvm-codegen.h"
#include "exec/hdfs-scan-node.h"
#include "exec/read-write-util.h"
#include "exec/scanner-context.inline.h"
#include "runtime/raw-value.h"
#include "runtime/runtime-state.h"
#include "util/codec.h"
#include "util/decompress.h"
#include "util/runtime-profile-counters.h"
#include "util/test-info.h"
#include "common/names.h"
// Note: the Avro C++ library uses exceptions for error handling. Any Avro
// function that may throw an exception must be placed in a try/catch block.
using namespace impala;
using namespace llvm;
using namespace strings;
const char* HdfsAvroScanner::LLVM_CLASS_NAME = "class.impala::HdfsAvroScanner";
const uint8_t HdfsAvroScanner::AVRO_VERSION_HEADER[4] = {'O', 'b', 'j', 1};
const string HdfsAvroScanner::AVRO_SCHEMA_KEY("avro.schema");
const string HdfsAvroScanner::AVRO_CODEC_KEY("avro.codec");
const string HdfsAvroScanner::AVRO_NULL_CODEC("null");
const string HdfsAvroScanner::AVRO_SNAPPY_CODEC("snappy");
const string HdfsAvroScanner::AVRO_DEFLATE_CODEC("deflate");
const string AVRO_MEM_LIMIT_EXCEEDED = "HdfsAvroScanner::$0() failed to allocate "
"$1 bytes for $2.";
#define RETURN_IF_FALSE(x) if (UNLIKELY(!(x))) return parse_status_
HdfsAvroScanner::HdfsAvroScanner(HdfsScanNodeBase* scan_node, RuntimeState* state)
: BaseSequenceScanner(scan_node, state),
avro_header_(NULL),
codegend_decode_avro_data_(NULL) {
}
HdfsAvroScanner::HdfsAvroScanner()
: BaseSequenceScanner(),
avro_header_(NULL),
codegend_decode_avro_data_(NULL) {
DCHECK(TestInfo::is_test());
}
Status HdfsAvroScanner::Open(ScannerContext* context) {
RETURN_IF_ERROR(BaseSequenceScanner::Open(context));
if (scan_node_->avro_schema().schema == NULL) {
return Status("Missing Avro schema in scan node. This could be due to stale "
"metadata. Running 'invalidate metadata <tablename>' may resolve the problem.");
}
return Status::OK();
}
Status HdfsAvroScanner::Codegen(HdfsScanNodeBase* node,
const vector<ExprContext*>& conjunct_ctxs, Function** decode_avro_data_fn) {
*decode_avro_data_fn = NULL;
DCHECK(node->runtime_state()->ShouldCodegen());
LlvmCodeGen* codegen = node->runtime_state()->codegen();
DCHECK(codegen != NULL);
Function* materialize_tuple_fn = NULL;
RETURN_IF_ERROR(CodegenMaterializeTuple(node, codegen, &materialize_tuple_fn));
DCHECK(materialize_tuple_fn != NULL);
RETURN_IF_ERROR(CodegenDecodeAvroData(codegen, materialize_tuple_fn,
conjunct_ctxs, decode_avro_data_fn));
DCHECK(*decode_avro_data_fn != NULL);
return Status::OK();
}
BaseSequenceScanner::FileHeader* HdfsAvroScanner::AllocateFileHeader() {
AvroFileHeader* header = new AvroFileHeader();
header->template_tuple = template_tuple_;
return header;
}
Status HdfsAvroScanner::ReadFileHeader() {
avro_header_ = reinterpret_cast<AvroFileHeader*>(header_);
// Check version header
uint8_t* header;
RETURN_IF_FALSE(stream_->ReadBytes(
sizeof(AVRO_VERSION_HEADER), &header, &parse_status_));
if (memcmp(header, AVRO_VERSION_HEADER, sizeof(AVRO_VERSION_HEADER))) {
return Status(TErrorCode::AVRO_BAD_VERSION_HEADER,
stream_->filename(), ReadWriteUtil::HexDump(header, sizeof(AVRO_VERSION_HEADER)));
}
// Decode relevant metadata (encoded as Avro map)
RETURN_IF_ERROR(ParseMetadata());
// Read file sync marker
uint8_t* sync;
RETURN_IF_FALSE(stream_->ReadBytes(SYNC_HASH_SIZE, &sync, &parse_status_));
memcpy(header_->sync, sync, SYNC_HASH_SIZE);
header_->header_size = stream_->total_bytes_returned() - SYNC_HASH_SIZE;
return Status::OK();
}
Status HdfsAvroScanner::ParseMetadata() {
header_->is_compressed = false;
header_->compression_type = THdfsCompression::NONE;
int64_t num_entries;
RETURN_IF_FALSE(stream_->ReadZLong(&num_entries, &parse_status_));
if (num_entries < 1) {
return Status(TErrorCode::AVRO_INVALID_METADATA_COUNT, stream_->filename(),
num_entries, stream_->file_offset());
}
while (num_entries != 0) {
DCHECK_GT(num_entries, 0);
for (int i = 0; i < num_entries; ++i) {
// Decode Avro string-type key
string key;
uint8_t* key_buf;
int64_t key_len;
RETURN_IF_FALSE(stream_->ReadZLong(&key_len, &parse_status_));
if (key_len < 0) {
return Status(TErrorCode::AVRO_INVALID_LENGTH, stream_->filename(), key_len,
stream_->file_offset());
}
RETURN_IF_FALSE(stream_->ReadBytes(key_len, &key_buf, &parse_status_));
key = string(reinterpret_cast<char*>(key_buf), key_len);
// Decode Avro bytes-type value
uint8_t* value;
int64_t value_len;
RETURN_IF_FALSE(stream_->ReadZLong(&value_len, &parse_status_));
if (value_len < 0) {
return Status(TErrorCode::AVRO_INVALID_LENGTH, stream_->filename(), value_len,
stream_->file_offset());
}
RETURN_IF_FALSE(stream_->ReadBytes(value_len, &value, &parse_status_));
if (key == AVRO_SCHEMA_KEY) {
avro_schema_t raw_file_schema;
int error = avro_schema_from_json_length(
reinterpret_cast<char*>(value), value_len, &raw_file_schema);
if (error != 0) {
stringstream ss;
ss << "Failed to parse file schema: " << avro_strerror();
return Status(ss.str());
}
AvroSchemaElement* file_schema = avro_header_->schema.get();
RETURN_IF_ERROR(AvroSchemaElement::ConvertSchema(raw_file_schema, file_schema));
RETURN_IF_ERROR(ResolveSchemas(scan_node_->avro_schema(), file_schema));
// We currently codegen a function only for the table schema. If this file's
// schema is different from the table schema, don't use the codegen'd function and
// use the interpreted path instead.
avro_header_->use_codegend_decode_avro_data = avro_schema_equal(
scan_node_->avro_schema().schema, file_schema->schema);
} else if (key == AVRO_CODEC_KEY) {
string avro_codec(reinterpret_cast<char*>(value), value_len);
if (avro_codec != AVRO_NULL_CODEC) {
header_->is_compressed = true;
// This scanner doesn't use header_->codec (Avro doesn't use the
// Hadoop codec strings), but fill it in for logging
header_->codec = avro_codec;
if (avro_codec == AVRO_SNAPPY_CODEC) {
header_->compression_type = THdfsCompression::SNAPPY;
} else if (avro_codec == AVRO_DEFLATE_CODEC) {
header_->compression_type = THdfsCompression::DEFLATE;
} else {
return Status("Unknown Avro compression codec: " + avro_codec);
}
}
} else {
VLOG_ROW << "Skipping metadata entry: " << key;
}
}
RETURN_IF_FALSE(stream_->ReadZLong(&num_entries, &parse_status_));
if (num_entries < 0) {
return Status(TErrorCode::AVRO_INVALID_METADATA_COUNT, stream_->filename(),
num_entries, stream_->file_offset());
}
}
VLOG_FILE << stream_->filename() << ": "
<< (header_->is_compressed ? "compressed" : "not compressed");
if (header_->is_compressed) VLOG_FILE << header_->codec;
if (avro_header_->schema->children.empty()) {
return Status("Schema not found in file header metadata");
}
return Status::OK();
}
// Schema resolution is performed per materialized slot (meaning we don't perform schema
// resolution for non-materialized columns). For each slot, we traverse the table schema
// using the column path (i.e., the traversal is by ordinal). We simultaneously traverse
// the file schema using the table schema's field names. The final field should exist in
// both schemas and be promotable to the slot type. If the file schema is missing a field,
// we check for a default value in the table schema and use that instead.
// TODO: test unresolvable schemas
// TODO: improve error messages
Status HdfsAvroScanner::ResolveSchemas(const AvroSchemaElement& table_root,
AvroSchemaElement* file_root) {
if (table_root.schema->type != AVRO_RECORD) return Status("Table schema is not a record");
if (file_root->schema->type != AVRO_RECORD) return Status("File schema is not a record");
// Associate each slot descriptor with a field in the file schema, or fill in the
// template tuple with a default value from the table schema.
for (SlotDescriptor* slot_desc: scan_node_->materialized_slots()) {
// Traverse the column path, simultaneously traversing the table schema by ordinal and
// the file schema by field name from the table schema.
const SchemaPath& path = slot_desc->col_path();
const AvroSchemaElement* table_record = &table_root;
AvroSchemaElement* file_record = file_root;
for (int i = 0; i < path.size(); ++i) {
int table_field_idx = i > 0 ? path[i] : path[i] - scan_node_->num_partition_keys();
int num_fields = table_record->children.size();
if (table_field_idx >= num_fields) {
// TODO: add path to error message (and elsewhere)
return Status(TErrorCode::AVRO_MISSING_FIELD, table_field_idx, num_fields);
}
const char* field_name =
avro_schema_record_field_name(table_record->schema, table_field_idx);
int file_field_idx =
avro_schema_record_field_get_index(file_record->schema, field_name);
if (file_field_idx < 0) {
// This field doesn't exist in the file schema. Check if there is a default value.
avro_datum_t default_value =
avro_schema_record_field_default(table_record->schema, table_field_idx);
if (default_value == NULL) {
return Status(TErrorCode::AVRO_MISSING_DEFAULT, field_name);
}
RETURN_IF_ERROR(WriteDefaultValue(slot_desc, default_value, field_name));
DCHECK_EQ(i, path.size() - 1) <<
"WriteDefaultValue() doesn't support default records yet, should have failed";
continue;
}
const AvroSchemaElement& table_field = table_record->children[table_field_idx];
AvroSchemaElement& file_field = file_record->children[file_field_idx];
RETURN_IF_ERROR(VerifyTypesMatch(table_field, file_field, field_name));
if (i != path.size() - 1) {
// All but the last index in 'path' should be a record field
if (table_record->schema->type != AVRO_RECORD) {
return Status(TErrorCode::AVRO_NOT_A_RECORD, field_name);
} else {
DCHECK_EQ(file_record->schema->type, AVRO_RECORD);
}
table_record = &table_field;
file_record = &file_field;
} else {
// This should be the field corresponding to 'slot_desc'. Check that slot_desc can
// be resolved to the table's Avro schema.
RETURN_IF_ERROR(VerifyTypesMatch(slot_desc, table_field.schema));
file_field.slot_desc = slot_desc;
}
}
}
return Status::OK();
}
Status HdfsAvroScanner::WriteDefaultValue(
SlotDescriptor* slot_desc, avro_datum_t default_value, const char* field_name) {
if (avro_header_->template_tuple == NULL) {
if (template_tuple_ != NULL) {
avro_header_->template_tuple = template_tuple_;
} else {
avro_header_->template_tuple =
Tuple::Create(tuple_byte_size_, template_tuple_pool_.get());
}
}
switch (default_value->type) {
case AVRO_BOOLEAN: {
// We don't call VerifyTypesMatch() above the switch statement so we don't want to
// call it in the default case (since we VerifyTypesMatch() can't handle every type
// either, and we want to return the correct error message).
RETURN_IF_ERROR(VerifyTypesMatch(slot_desc, default_value));
int8_t v;
if (avro_boolean_get(default_value, &v)) DCHECK(false);
RawValue::Write(&v, avro_header_->template_tuple, slot_desc, NULL);
break;
}
case AVRO_INT32: {
RETURN_IF_ERROR(VerifyTypesMatch(slot_desc, default_value));
int32_t v;
if (avro_int32_get(default_value, &v)) DCHECK(false);
RawValue::Write(&v, avro_header_->template_tuple, slot_desc, NULL);
break;
}
case AVRO_INT64: {
RETURN_IF_ERROR(VerifyTypesMatch(slot_desc, default_value));
int64_t v;
if (avro_int64_get(default_value, &v)) DCHECK(false);
RawValue::Write(&v, avro_header_->template_tuple, slot_desc, NULL);
break;
}
case AVRO_FLOAT: {
RETURN_IF_ERROR(VerifyTypesMatch(slot_desc, default_value));
float v;
if (avro_float_get(default_value, &v)) DCHECK(false);
RawValue::Write(&v, avro_header_->template_tuple, slot_desc, NULL);
break;
}
case AVRO_DOUBLE: {
RETURN_IF_ERROR(VerifyTypesMatch(slot_desc, default_value));
double v;
if (avro_double_get(default_value, &v)) DCHECK(false);
RawValue::Write(&v, avro_header_->template_tuple, slot_desc, NULL);
break;
}
case AVRO_STRING:
case AVRO_BYTES: {
RETURN_IF_ERROR(VerifyTypesMatch(slot_desc, default_value));
char* v;
if (avro_string_get(default_value, &v)) DCHECK(false);
StringValue sv(v);
RawValue::Write(&sv, avro_header_->template_tuple, slot_desc,
template_tuple_pool_.get());
break;
}
case AVRO_NULL:
RETURN_IF_ERROR(VerifyTypesMatch(slot_desc, default_value));
avro_header_->template_tuple->SetNull(slot_desc->null_indicator_offset());
break;
default:
return Status(TErrorCode::AVRO_UNSUPPORTED_DEFAULT_VALUE, field_name,
avro_type_name(default_value->type));
}
return Status::OK();
}
Status HdfsAvroScanner::VerifyTypesMatch(const AvroSchemaElement& table_schema,
const AvroSchemaElement& file_schema, const string& field_name) {
if (!table_schema.nullable() && file_schema.nullable()) {
// Use ErrorMsg because corresponding Status ctor is ambiguous
ErrorMsg msg(TErrorCode::AVRO_NULLABILITY_MISMATCH, field_name);
return Status(msg);
}
if (file_schema.schema->type == AVRO_NULL) {
if (table_schema.schema->type == AVRO_NULL || table_schema.nullable()) {
return Status::OK();
} else {
return Status(TErrorCode::AVRO_SCHEMA_RESOLUTION_ERROR, field_name,
avro_type_name(table_schema.schema->type),
avro_type_name(file_schema.schema->type));
}
}
// Can't convert records to ColumnTypes, check here instead of below
// TODO: update if/when we have TYPE_STRUCT primitive type
if ((table_schema.schema->type == AVRO_RECORD) ^
(file_schema.schema->type == AVRO_RECORD)) {
return Status(TErrorCode::AVRO_SCHEMA_RESOLUTION_ERROR, field_name,
avro_type_name(table_schema.schema->type),
avro_type_name(file_schema.schema->type));
} else if (table_schema.schema->type == AVRO_RECORD) {
DCHECK_EQ(file_schema.schema->type, AVRO_RECORD);
return Status::OK();
}
ColumnType reader_type;
RETURN_IF_ERROR(AvroSchemaToColumnType(table_schema.schema, field_name, &reader_type));
ColumnType writer_type;
RETURN_IF_ERROR(AvroSchemaToColumnType(file_schema.schema, field_name, &writer_type));
bool match = VerifyTypesMatch(reader_type, writer_type);
if (match) return Status::OK();
return Status(TErrorCode::AVRO_SCHEMA_RESOLUTION_ERROR, field_name,
avro_type_name(table_schema.schema->type),
avro_type_name(file_schema.schema->type));
}
Status HdfsAvroScanner::VerifyTypesMatch(SlotDescriptor* slot_desc, avro_obj_t* schema) {
// TODO: make this work for nested fields
const string& col_name =
scan_node_->hdfs_table()->col_descs()[slot_desc->col_pos()].name();
// All Impala types are nullable
if (schema->type == AVRO_NULL) return Status::OK();
// Can't convert records to ColumnTypes, check here instead of below
// TODO: update if/when we have TYPE_STRUCT primitive type
if (schema->type == AVRO_RECORD) {
return Status(TErrorCode::AVRO_SCHEMA_METADATA_MISMATCH, col_name,
slot_desc->type().DebugString(), avro_type_name(schema->type));
}
ColumnType file_type;
RETURN_IF_ERROR(AvroSchemaToColumnType(schema, col_name, &file_type));
bool match = VerifyTypesMatch(slot_desc->type(), file_type);
if (match) return Status::OK();
return Status(TErrorCode::AVRO_SCHEMA_METADATA_MISMATCH, col_name,
slot_desc->type().DebugString(), avro_type_name(schema->type));
}
bool HdfsAvroScanner::VerifyTypesMatch(
const ColumnType& reader_type, const ColumnType& writer_type) {
switch (writer_type.type) {
case TYPE_DECIMAL:
if (reader_type.type != TYPE_DECIMAL) return false;
if (reader_type.scale != writer_type.scale) return false;
if (reader_type.precision != writer_type.precision) return false;
return true;
case TYPE_STRING: return reader_type.IsStringType();
case TYPE_INT:
switch(reader_type.type) {
case TYPE_INT:
// Type promotion
case TYPE_BIGINT:
case TYPE_FLOAT:
case TYPE_DOUBLE:
return true;
default:
return false;
}
case TYPE_BIGINT:
switch(reader_type.type) {
case TYPE_BIGINT:
// Type promotion
case TYPE_FLOAT:
case TYPE_DOUBLE:
return true;
default:
return false;
}
case TYPE_FLOAT:
switch(reader_type.type) {
case TYPE_FLOAT:
// Type promotion
case TYPE_DOUBLE:
return true;
default:
return false;
}
case TYPE_DOUBLE: return reader_type.type == TYPE_DOUBLE;
case TYPE_BOOLEAN: return reader_type.type == TYPE_BOOLEAN;
default:
DCHECK(false) << "NYI: " << writer_type.DebugString();
return false;
}
}
Status HdfsAvroScanner::InitNewRange() {
DCHECK(header_ != NULL);
only_parsing_header_ = false;
avro_header_ = reinterpret_cast<AvroFileHeader*>(header_);
template_tuple_ = avro_header_->template_tuple;
if (header_->is_compressed) {
RETURN_IF_ERROR(UpdateDecompressor(header_->compression_type));
}
if (avro_header_->use_codegend_decode_avro_data) {
codegend_decode_avro_data_ = reinterpret_cast<DecodeAvroDataFn>(
scan_node_->GetCodegenFn(THdfsFileFormat::AVRO));
}
if (codegend_decode_avro_data_ == NULL) {
scan_node_->IncNumScannersCodegenDisabled();
} else {
VLOG(2) << "HdfsAvroScanner (node_id=" << scan_node_->id()
<< ") using llvm codegend functions.";
scan_node_->IncNumScannersCodegenEnabled();
}
return Status::OK();
}
Status HdfsAvroScanner::ProcessRange() {
while (!finished()) {
int64_t num_records;
uint8_t* compressed_data;
int64_t compressed_size;
uint8_t* data;
int64_t data_len;
uint8_t* data_end;
// Read new data block
RETURN_IF_FALSE(
stream_->ReadZLong(&num_records, &parse_status_));
if (num_records < 0) {
return Status(TErrorCode::AVRO_INVALID_RECORD_COUNT, stream_->filename(),
num_records, stream_->file_offset());
}
DCHECK_GE(num_records, 0);
RETURN_IF_FALSE(stream_->ReadZLong(&compressed_size, &parse_status_));
if (compressed_size < 0) {
return Status(TErrorCode::AVRO_INVALID_COMPRESSED_SIZE, stream_->filename(),
compressed_size, stream_->file_offset());
}
RETURN_IF_FALSE(stream_->ReadBytes(
compressed_size, &compressed_data, &parse_status_));
if (header_->is_compressed) {
if (header_->compression_type == THdfsCompression::SNAPPY) {
// Snappy-compressed data block includes trailing 4-byte checksum,
// decompressor_ doesn't expect this
compressed_size -= SnappyDecompressor::TRAILING_CHECKSUM_LEN;
}
SCOPED_TIMER(decompress_timer_);
RETURN_IF_ERROR(decompressor_->ProcessBlock(false, compressed_size, compressed_data,
&data_len, &data));
VLOG_FILE << "Decompressed " << compressed_size << " to " << data_len;
} else {
data = compressed_data;
data_len = compressed_size;
}
data_end = data + data_len;
// Process block data
while (num_records > 0) {
SCOPED_TIMER(scan_node_->materialize_tuple_timer());
MemPool* pool;
Tuple* tuple;
TupleRow* tuple_row;
int max_tuples = GetMemory(&pool, &tuple, &tuple_row);
max_tuples = min<int64_t>(num_records, max_tuples);
int num_to_commit;
if (scan_node_->materialized_slots().empty()) {
// No slots to materialize (e.g. count(*)), no need to decode data
num_to_commit = WriteTemplateTuples(tuple_row, max_tuples);
} else {
if (codegend_decode_avro_data_ != NULL) {
num_to_commit = codegend_decode_avro_data_(this, max_tuples, pool, &data,
data_end, tuple, tuple_row);
} else {
num_to_commit = DecodeAvroData(max_tuples, pool, &data, data_end, tuple,
tuple_row);
}
}
RETURN_IF_ERROR(parse_status_);
RETURN_IF_ERROR(CommitRows(num_to_commit));
num_records -= max_tuples;
COUNTER_ADD(scan_node_->rows_read_counter(), max_tuples);
if (scan_node_->ReachedLimit()) return Status::OK();
}
if (decompressor_.get() != NULL && !decompressor_->reuse_output_buffer()) {
RETURN_IF_ERROR(AttachPool(data_buffer_pool_.get(), true));
}
RETURN_IF_ERROR(ReadSync());
}
return Status::OK();
}
bool HdfsAvroScanner::MaterializeTuple(const AvroSchemaElement& record_schema,
MemPool* pool, uint8_t** data, uint8_t* data_end, Tuple* tuple) {
DCHECK_EQ(record_schema.schema->type, AVRO_RECORD);
for (const AvroSchemaElement& element: record_schema.children) {
DCHECK_LE(*data, data_end);
const SlotDescriptor* slot_desc = element.slot_desc;
bool write_slot = false;
void* slot = NULL;
PrimitiveType slot_type = INVALID_TYPE;
if (slot_desc != NULL) {
write_slot = true;
slot = tuple->GetSlot(slot_desc->tuple_offset());
slot_type = slot_desc->type().type;
}
avro_type_t type = element.schema->type;
if (element.nullable()) {
bool is_null;
if (!ReadUnionType(element.null_union_position, data, data_end, &is_null)) {
return false;
}
if (is_null) type = AVRO_NULL;
}
bool success;
switch (type) {
case AVRO_NULL:
if (slot_desc != NULL) tuple->SetNull(slot_desc->null_indicator_offset());
success = true;
break;
case AVRO_BOOLEAN:
success = ReadAvroBoolean(slot_type, data, data_end, write_slot, slot, pool);
break;
case AVRO_INT32:
success = ReadAvroInt32(slot_type, data, data_end, write_slot, slot, pool);
break;
case AVRO_INT64:
success = ReadAvroInt64(slot_type, data, data_end, write_slot, slot, pool);
break;
case AVRO_FLOAT:
success = ReadAvroFloat(slot_type, data, data_end, write_slot, slot, pool);
break;
case AVRO_DOUBLE:
success = ReadAvroDouble(slot_type, data, data_end, write_slot, slot, pool);
break;
case AVRO_STRING:
case AVRO_BYTES:
if (slot_desc != NULL && slot_desc->type().type == TYPE_VARCHAR) {
success = ReadAvroVarchar(slot_type, slot_desc->type().len, data, data_end,
write_slot, slot, pool);
} else if (slot_desc != NULL && slot_desc->type().type == TYPE_CHAR) {
success = ReadAvroChar(slot_type, slot_desc->type().len, data, data_end,
write_slot, slot, pool);
} else {
success = ReadAvroString(slot_type, data, data_end, write_slot, slot, pool);
}
break;
case AVRO_DECIMAL: {
int slot_byte_size = 0;
if (slot_desc != NULL) {
DCHECK_EQ(slot_type, TYPE_DECIMAL);
slot_byte_size = slot_desc->type().GetByteSize();
}
success = ReadAvroDecimal(slot_byte_size, data, data_end, write_slot, slot, pool);
break;
}
case AVRO_RECORD:
success = MaterializeTuple(element, pool, data, data_end, tuple);
break;
default:
success = false;
DCHECK(false) << "Unsupported SchemaElement: " << type;
}
if (UNLIKELY(!success)) {
DCHECK(!parse_status_.ok());
return false;
}
}
return true;
}
void HdfsAvroScanner::SetStatusCorruptData(TErrorCode::type error_code) {
DCHECK(parse_status_.ok());
if (TestInfo::is_test()) {
parse_status_ = Status(error_code, "test file", 123);
} else {
parse_status_ = Status(error_code, stream_->filename(), stream_->file_offset());
}
}
void HdfsAvroScanner::SetStatusInvalidValue(TErrorCode::type error_code, int64_t len) {
DCHECK(parse_status_.ok());
if (TestInfo::is_test()) {
parse_status_ = Status(error_code, "test file", len, 123);
} else {
parse_status_ = Status(error_code, stream_->filename(), len, stream_->file_offset());
}
}
void HdfsAvroScanner::SetStatusValueOverflow(TErrorCode::type error_code, int64_t len,
int64_t limit) {
DCHECK(parse_status_.ok());
if (TestInfo::is_test()) {
parse_status_ = Status(error_code, "test file", len, limit, 123);
} else {
parse_status_ = Status(error_code, stream_->filename(), len, limit, stream_->file_offset());
}
}
// This function produces a codegen'd function equivalent to MaterializeTuple() but
// optimized for the table schema. Via helper functions CodegenReadRecord() and
// CodegenReadScalar(), it eliminates the conditionals necessary when interpreting the
// type of each element in the schema, instead generating code to handle each element in
// the schema. Example output with tpch.region:
//
// define i1 @MaterializeTuple(%"class.impala::HdfsAvroScanner"* %this,
// %"struct.impala::AvroSchemaElement"* %record_schema, %"class.impala::MemPool"* %pool,
// i8** %data, i8* %data_end, %"class.impala::Tuple"* %tuple) #33 {
// entry:
// %is_null_ptr = alloca i1
// %tuple_ptr = bitcast %"class.impala::Tuple"* %tuple to { i8, [3 x i8], i32,
// %"struct.impala::StringValue", %"struct.impala::StringValue" }*
// %0 = bitcast i1* %is_null_ptr to i8*
// %read_union_ok = call i1 @_ZN6impala15HdfsAvroScanner13ReadUnionTypeEiPPhPlPb(
// %"class.impala::HdfsAvroScanner"* %this, i32 1, i8** %data, i8* %data_end, i8* %0)
// br i1 %read_union_ok, label %read_union_ok1, label %bail_out
//
// read_union_ok1: ; preds = %entry
// %is_null = load i1, i1* %is_null_ptr
// br i1 %is_null, label %null_field, label %read_field
//
// read_field: ; preds = %read_union_ok1
// %slot = getelementptr inbounds { i8, [3 x i8], i32, %"struct.impala::StringValue",
// %"struct.impala::StringValue" }, { i8, [3 x i8], i32, %"struct.impala::StringValue",
// %"struct.impala::StringValue" }* %tuple_ptr, i32 0, i32 2
// %opaque_slot = bitcast i32* %slot to i8*
// %success = call i1
// @_ZN6impala15HdfsAvroScanner13ReadAvroInt32ENS_13PrimitiveTypeEPPhPlbPvPNS_7MemPoolE(
// %"class.impala::HdfsAvroScanner"* %this, i32 5, i8** %data, i8* %data_end,
// i1 true, i8* %opaque_slot, %"class.impala::MemPool"* %pool)
// br i1 %success, label %end_field, label %bail_out
//
// null_field: ; preds = %read_union_ok1
// call void @SetNull({ i8, [3 x i8], i32, %"struct.impala::StringValue",
// %"struct.impala::StringValue" }* %tuple_ptr)
// br label %end_field
//
// end_field: ; preds = %read_field, %null_field
// %1 = bitcast i1* %is_null_ptr to i8*
// %read_union_ok4 = call i1 @_ZN6impala15HdfsAvroScanner13ReadUnionTypeEiPPhPlPb(
// %"class.impala::HdfsAvroScanner"* %this, i32 1, i8** %data, i8* %data_end, i8* %1)
// br i1 %read_union_ok4, label %read_union_ok5, label %bail_out
//
// read_union_ok5: ; preds = %end_field
// %is_null7 = load i1, i1* %is_null_ptr
// br i1 %is_null7, label %null_field6, label %read_field2
//
// read_field2: ; preds = %read_union_ok5
// %slot8 = getelementptr inbounds { i8, [3 x i8], i32, %"struct.impala::StringValue",
// %"struct.impala::StringValue" }, { i8, [3 x i8], i32, %"struct.impala::StringValue",
// %"struct.impala::StringValue" }* %tuple_ptr, i32 0, i32 3
// %opaque_slot9 = bitcast %"struct.impala::StringValue"* %slot8 to i8*
// %success10 = call i1
// @_ZN6impala15HdfsAvroScanner14ReadAvroStringENS_13PrimitiveTypeEPPhPlbPvPNS_7MemPoolE(
// %"class.impala::HdfsAvroScanner"* %this, i32 10, i8** %data, i8* %data_end,
// i1 true, i8* %opaque_slot9, %"class.impala::MemPool"* %pool)
// br i1 %success10, label %end_field3, label %bail_out
//
// null_field6: ; preds = %read_union_ok5
// call void @SetNull.1({ i8, [3 x i8], i32, %"struct.impala::StringValue",
// %"struct.impala::StringValue" }* %tuple_ptr)
// br label %end_field3
//
// end_field3: ; preds = %read_field2, %null_field6
// %2 = bitcast i1* %is_null_ptr to i8*
// %read_union_ok13 = call i1 @_ZN6impala15HdfsAvroScanner13ReadUnionTypeEiPPhPlPb(
// %"class.impala::HdfsAvroScanner"* %this, i32 1, i8** %data, i8* %data_end, i8* %2)
// br i1 %read_union_ok13, label %read_union_ok14, label %bail_out
//
// read_union_ok14: ; preds = %end_field3
// %is_null16 = load i1, i1* %is_null_ptr
// br i1 %is_null16, label %null_field15, label %read_field11
//
// read_field11: ; preds = %read_union_ok14
// %slot17 = getelementptr inbounds { i8, [3 x i8], i32, %"struct.impala::StringValue",
// %"struct.impala::StringValue" }, { i8, [3 x i8], i32, %"struct.impala::StringValue",
// %"struct.impala::StringValue" }* %tuple_ptr, i32 0, i32 4
// %opaque_slot18 = bitcast %"struct.impala::StringValue"* %slot17 to i8*
// %success19 = call i1
// @_ZN6impala15HdfsAvroScanner14ReadAvroStringENS_13PrimitiveTypeEPPhPlbPvPNS_7MemPoolE(
// %"class.impala::HdfsAvroScanner"* %this, i32 10, i8** %data, i8* %data_end,
// i1 true, i8* %opaque_slot18, %"class.impala::MemPool"* %pool)
// br i1 %success19, label %end_field12, label %bail_out
//
// null_field15: ; preds = %read_union_ok14
// call void @SetNull.2({ i8, [3 x i8], i32, %"struct.impala::StringValue",
// %"struct.impala::StringValue" }* %tuple_ptr)
// br label %end_field12
//
// end_field12: ; preds = %read_field11, %null_field15
// ret i1 true
//
// bail_out: ; preds = %read_field11, %end_field3, %read_field2, %end_field,
// ret i1 false ; %read_field, %entry
// }
Status HdfsAvroScanner::CodegenMaterializeTuple(
HdfsScanNodeBase* node, LlvmCodeGen* codegen, Function** materialize_tuple_fn) {
LLVMContext& context = codegen->context();
LlvmBuilder builder(context);
Type* this_type = codegen->GetType(HdfsAvroScanner::LLVM_CLASS_NAME);
DCHECK(this_type != NULL);
PointerType* this_ptr_type = PointerType::get(this_type, 0);
TupleDescriptor* tuple_desc = const_cast<TupleDescriptor*>(node->tuple_desc());
StructType* tuple_type = tuple_desc->GetLlvmStruct(codegen);
if (tuple_type == NULL) return Status("Could not generate tuple struct.");
Type* tuple_ptr_type = PointerType::get(tuple_type, 0);
Type* tuple_opaque_type = codegen->GetType(Tuple::LLVM_CLASS_NAME);
PointerType* tuple_opaque_ptr_type = PointerType::get(tuple_opaque_type, 0);
Type* data_ptr_type = PointerType::get(codegen->ptr_type(), 0); // char**
Type* mempool_type = PointerType::get(codegen->GetType(MemPool::LLVM_CLASS_NAME), 0);
Type* schema_element_type = codegen->GetPtrType(AvroSchemaElement::LLVM_CLASS_NAME);
LlvmCodeGen::FnPrototype prototype(codegen, "MaterializeTuple", codegen->boolean_type());
prototype.AddArgument(LlvmCodeGen::NamedVariable("this", this_ptr_type));
prototype.AddArgument(LlvmCodeGen::NamedVariable("record_schema", schema_element_type));
prototype.AddArgument(LlvmCodeGen::NamedVariable("pool", mempool_type));
prototype.AddArgument(LlvmCodeGen::NamedVariable("data", data_ptr_type));
prototype.AddArgument(LlvmCodeGen::NamedVariable("data_end", codegen->ptr_type()));
prototype.AddArgument(LlvmCodeGen::NamedVariable("tuple", tuple_opaque_ptr_type));
Value* args[6];
Function* fn = prototype.GeneratePrototype(&builder, args);
Value* this_val = args[0];
// Value* record_schema_val = args[1]; // don't need this
Value* pool_val = args[2];
Value* data_val = args[3];
Value* data_end_val = args[4];
Value* opaque_tuple_val = args[5];
Value* tuple_val = builder.CreateBitCast(opaque_tuple_val, tuple_ptr_type, "tuple_ptr");
// Create a bail out block to handle decoding failures.
BasicBlock* bail_out_block = BasicBlock::Create(context, "bail_out", fn, NULL);
Status status = CodegenReadRecord(
SchemaPath(), node->avro_schema(), node, codegen, &builder, fn, bail_out_block,
bail_out_block, this_val, pool_val, tuple_val, data_val, data_end_val);
if (!status.ok()) {
VLOG_QUERY << status.GetDetail();
fn->eraseFromParent();
return status;
}
// Returns true on successful decoding.
builder.CreateRet(codegen->true_value());
// Returns false on decoding errors.
builder.SetInsertPoint(bail_out_block);
builder.CreateRet(codegen->false_value());
*materialize_tuple_fn = codegen->FinalizeFunction(fn);
if (*materialize_tuple_fn == NULL) {
return Status("Failed to finalize materialize_tuple_fn.");
}
return Status::OK();
}
Status HdfsAvroScanner::CodegenReadRecord(
const SchemaPath& path, const AvroSchemaElement& record, HdfsScanNodeBase* node,
LlvmCodeGen* codegen, void* void_builder, Function* fn, BasicBlock* insert_before,
BasicBlock* bail_out, Value* this_val, Value* pool_val, Value* tuple_val,
Value* data_val, Value* data_end_val) {
if (record.schema == NULL) {
return Status("Missing Avro schema in scan node. This could be due to stale "
"metadata. Running 'invalidate metadata <tablename>' may resolve the problem.");
}
DCHECK_EQ(record.schema->type, AVRO_RECORD);
LLVMContext& context = codegen->context();
LlvmBuilder* builder = reinterpret_cast<LlvmBuilder*>(void_builder);
// Codegen logic for parsing each field and, if necessary, populating a slot with the
// result.
// Used to store result of ReadUnionType() call
Value* is_null_ptr = NULL;
for (int i = 0; i < record.children.size(); ++i) {
const AvroSchemaElement* field = &record.children[i];
int col_idx = i;
// If we're about to process the table-level columns, account for the partition keys
// when constructing 'path'
if (path.empty()) col_idx += node->num_partition_keys();
SchemaPath new_path = path;
new_path.push_back(col_idx);
int slot_idx = node->GetMaterializedSlotIdx(new_path);
SlotDescriptor* slot_desc = (slot_idx == HdfsScanNode::SKIP_COLUMN) ?
NULL : node->materialized_slots()[slot_idx];
// Block that calls appropriate Read<Type> function
BasicBlock* read_field_block =
BasicBlock::Create(context, "read_field", fn, insert_before);
// Block that handles a NULL value. We fill this in below if the field is nullable,
// otherwise we leave this block NULL.
BasicBlock* null_block = NULL;
// This is where we should end up after we're finished processing this field. Used to
// put the builder in the right place for the next field.
BasicBlock* end_field_block =
BasicBlock::Create(context, "end_field", fn, insert_before);
if (field->nullable()) {
// Field could be null. Create conditional branch based on ReadUnionType result.
Function* read_union_fn = codegen->GetFunction(IRFunction::READ_UNION_TYPE, false);
Value* null_union_pos_val =
codegen->GetIntConstant(TYPE_INT, field->null_union_position);
if (is_null_ptr == NULL) {
is_null_ptr = codegen->CreateEntryBlockAlloca(*builder, codegen->boolean_type(),
"is_null_ptr");
}
Value* is_null_ptr_cast = builder->CreateBitCast(is_null_ptr, codegen->ptr_type());
Value* read_union_ok = builder->CreateCall(read_union_fn,
ArrayRef<Value*>({this_val, null_union_pos_val, data_val, data_end_val,
is_null_ptr_cast}), "read_union_ok");
BasicBlock* read_union_ok_block = BasicBlock::Create(context, "read_union_ok", fn,
read_field_block);
builder->CreateCondBr(read_union_ok, read_union_ok_block, bail_out);
builder->SetInsertPoint(read_union_ok_block);
null_block = BasicBlock::Create(context, "null_field", fn, end_field_block);
Value* is_null = builder->CreateLoad(is_null_ptr, "is_null");
builder->CreateCondBr(is_null, null_block, read_field_block);
// Write null field IR
builder->SetInsertPoint(null_block);
if (slot_idx != HdfsScanNode::SKIP_COLUMN) {
slot_desc->CodegenSetNullIndicator(
codegen, builder, tuple_val, codegen->true_value());
}
// LLVM requires all basic blocks to end with a terminating instruction
builder->CreateBr(end_field_block);
} else {
// Field is never null, read field unconditionally.
builder->CreateBr(read_field_block);
}
// Write read_field_block IR
builder->SetInsertPoint(read_field_block);
Value *ret_val = nullptr;
if (field->schema->type == AVRO_RECORD) {
BasicBlock* insert_before_block =
(null_block != NULL) ? null_block : end_field_block;
RETURN_IF_ERROR(CodegenReadRecord(new_path, *field, node, codegen, builder, fn,
insert_before_block, bail_out, this_val, pool_val, tuple_val, data_val,
data_end_val));
} else {
RETURN_IF_ERROR(CodegenReadScalar(*field, slot_desc, codegen, builder,
this_val, pool_val, tuple_val, data_val, data_end_val, &ret_val));
}
builder->CreateCondBr(ret_val, end_field_block, bail_out);
// Set insertion point for next field.
builder->SetInsertPoint(end_field_block);
}
return Status::OK();
}
Status HdfsAvroScanner::CodegenReadScalar(const AvroSchemaElement& element,
SlotDescriptor* slot_desc, LlvmCodeGen* codegen, void* void_builder, Value* this_val,
Value* pool_val, Value* tuple_val, Value* data_val, Value* data_end_val,
Value** ret_val) {
LlvmBuilder* builder = reinterpret_cast<LlvmBuilder*>(void_builder);
Function* read_field_fn;
switch (element.schema->type) {
case AVRO_BOOLEAN:
read_field_fn = codegen->GetFunction(IRFunction::READ_AVRO_BOOLEAN, false);
break;
case AVRO_INT32:
read_field_fn = codegen->GetFunction(IRFunction::READ_AVRO_INT32, false);
break;
case AVRO_INT64:
read_field_fn = codegen->GetFunction(IRFunction::READ_AVRO_INT64, false);
break;
case AVRO_FLOAT:
read_field_fn = codegen->GetFunction(IRFunction::READ_AVRO_FLOAT, false);
break;
case AVRO_DOUBLE:
read_field_fn = codegen->GetFunction(IRFunction::READ_AVRO_DOUBLE, false);
break;
case AVRO_STRING:
case AVRO_BYTES:
if (slot_desc != NULL && slot_desc->type().type == TYPE_VARCHAR) {
read_field_fn = codegen->GetFunction(IRFunction::READ_AVRO_VARCHAR, false);
} else if (slot_desc != NULL && slot_desc->type().type == TYPE_CHAR) {
read_field_fn = codegen->GetFunction(IRFunction::READ_AVRO_CHAR, false);
} else {
read_field_fn = codegen->GetFunction(IRFunction::READ_AVRO_STRING, false);
}
break;
case AVRO_DECIMAL:
read_field_fn = codegen->GetFunction(IRFunction::READ_AVRO_DECIMAL, false);
break;
default:
return Status(Substitute(
"Failed to codegen MaterializeTuple() due to unsupported type: $0",
element.schema->type));
}
// Call appropriate ReadAvro<Type> function
Value* write_slot_val = builder->getFalse();
Value* slot_type_val = builder->getInt32(0);
Value* opaque_slot_val = codegen->null_ptr_value();
if (slot_desc != NULL) {
// Field corresponds to a materialized column, fill in relevant arguments
write_slot_val = builder->getTrue();
if (slot_desc->type().type == TYPE_DECIMAL) {
// ReadAvroDecimal() takes slot byte size instead of slot type
slot_type_val = builder->getInt32(slot_desc->type().GetByteSize());
} else {
slot_type_val = builder->getInt32(slot_desc->type().type);
}
Value* slot_val = builder->CreateStructGEP(NULL, tuple_val, slot_desc->llvm_field_idx(),
"slot");
opaque_slot_val =
builder->CreateBitCast(slot_val, codegen->ptr_type(), "opaque_slot");
}
// NOTE: ReadAvroVarchar/Char has different signature than rest of read functions
if (slot_desc != NULL &&
(slot_desc->type().type == TYPE_VARCHAR || slot_desc->type().type == TYPE_CHAR)) {
// Need to pass an extra argument (the length) to the codegen function.
Value* fixed_len = builder->getInt32(slot_desc->type().len);
Value* read_field_args[] = {this_val, slot_type_val, fixed_len, data_val,
data_end_val, write_slot_val, opaque_slot_val, pool_val};
*ret_val = builder->CreateCall(read_field_fn, read_field_args, "success");
} else {
Value* read_field_args[] = {this_val, slot_type_val, data_val, data_end_val,
write_slot_val, opaque_slot_val, pool_val};
*ret_val = builder->CreateCall(read_field_fn, read_field_args, "success");
}
return Status::OK();
}
Status HdfsAvroScanner::CodegenDecodeAvroData(LlvmCodeGen* codegen,
Function* materialize_tuple_fn, const vector<ExprContext*>& conjunct_ctxs,
Function** decode_avro_data_fn) {
SCOPED_TIMER(codegen->codegen_timer());
DCHECK(materialize_tuple_fn != NULL);
Function* fn = codegen->GetFunction(IRFunction::DECODE_AVRO_DATA, true);
int replaced = codegen->ReplaceCallSites(fn, materialize_tuple_fn, "MaterializeTuple");
DCHECK_EQ(replaced, 1);
Function* eval_conjuncts_fn;
RETURN_IF_ERROR(ExecNode::CodegenEvalConjuncts(codegen, conjunct_ctxs,
&eval_conjuncts_fn));
replaced = codegen->ReplaceCallSites(fn, eval_conjuncts_fn, "EvalConjuncts");
DCHECK_EQ(replaced, 1);
fn->setName("DecodeAvroData");
*decode_avro_data_fn = codegen->FinalizeFunction(fn);
if (*decode_avro_data_fn == NULL) {
return Status("Failed to finalize decode_avro_data_fn.");
}
return Status::OK();
}