| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "exec/hdfs-avro-scanner.h" |
| |
| #include <avro/errors.h> |
| #include <algorithm> // std::min |
| #include <avro/legacy.h> |
| #include <gutil/strings/substitute.h> |
| |
| #include "codegen/llvm-codegen.h" |
| #include "exec/hdfs-scan-node.h" |
| #include "exec/read-write-util.h" |
| #include "exec/scanner-context.inline.h" |
| #include "runtime/raw-value.h" |
| #include "runtime/runtime-state.h" |
| #include "util/codec.h" |
| #include "util/decompress.h" |
| #include "util/runtime-profile-counters.h" |
| #include "util/test-info.h" |
| |
| #include "common/names.h" |
| |
| // Note: the Avro C++ library uses exceptions for error handling. Any Avro |
| // function that may throw an exception must be placed in a try/catch block. |
| using namespace impala; |
| using namespace strings; |
| |
| const char* HdfsAvroScanner::LLVM_CLASS_NAME = "class.impala::HdfsAvroScanner"; |
| |
| const uint8_t HdfsAvroScanner::AVRO_VERSION_HEADER[4] = {'O', 'b', 'j', 1}; |
| |
| const string HdfsAvroScanner::AVRO_SCHEMA_KEY("avro.schema"); |
| const string HdfsAvroScanner::AVRO_CODEC_KEY("avro.codec"); |
| const string HdfsAvroScanner::AVRO_NULL_CODEC("null"); |
| const string HdfsAvroScanner::AVRO_SNAPPY_CODEC("snappy"); |
| const string HdfsAvroScanner::AVRO_DEFLATE_CODEC("deflate"); |
| |
| const string AVRO_MEM_LIMIT_EXCEEDED = "HdfsAvroScanner::$0() failed to allocate " |
| "$1 bytes for $2."; |
| |
| #define RETURN_IF_FALSE(x) if (UNLIKELY(!(x))) return parse_status_ |
| |
| static Status CheckSchema(const AvroSchemaElement& avro_schema) { |
| if (avro_schema.schema == nullptr) { |
| return Status("Missing Avro schema in scan node. This could be due to stale " |
| "metadata. Running 'invalidate metadata <tablename>' may resolve the problem."); |
| } |
| return Status::OK(); |
| } |
| |
| HdfsAvroScanner::HdfsAvroScanner(HdfsScanNodeBase* scan_node, RuntimeState* state) |
| : BaseSequenceScanner(scan_node, state) { |
| } |
| |
| HdfsAvroScanner::HdfsAvroScanner() |
| : BaseSequenceScanner() { |
| DCHECK(TestInfo::is_test()); |
| } |
| |
| Status HdfsAvroScanner::Open(ScannerContext* context) { |
| RETURN_IF_ERROR(BaseSequenceScanner::Open(context)); |
| RETURN_IF_ERROR(CheckSchema(scan_node_->avro_schema())); |
| return Status::OK(); |
| } |
| |
| Status HdfsAvroScanner::Codegen(HdfsScanNodeBase* node, |
| const vector<ScalarExpr*>& conjuncts, llvm::Function** decode_avro_data_fn) { |
| *decode_avro_data_fn = nullptr; |
| DCHECK(node->runtime_state()->ShouldCodegen()); |
| LlvmCodeGen* codegen = node->runtime_state()->codegen(); |
| DCHECK(codegen != nullptr); |
| RETURN_IF_ERROR(CodegenDecodeAvroData(node, codegen, conjuncts, decode_avro_data_fn)); |
| DCHECK(*decode_avro_data_fn != nullptr); |
| return Status::OK(); |
| } |
| |
| BaseSequenceScanner::FileHeader* HdfsAvroScanner::AllocateFileHeader() { |
| AvroFileHeader* header = new AvroFileHeader(); |
| header->template_tuple = template_tuple_; |
| return header; |
| } |
| |
| Status HdfsAvroScanner::ReadFileHeader() { |
| DCHECK(only_parsing_header_); |
| avro_header_ = reinterpret_cast<AvroFileHeader*>(header_); |
| |
| // Check version header |
| uint8_t* header; |
| RETURN_IF_FALSE(stream_->ReadBytes( |
| sizeof(AVRO_VERSION_HEADER), &header, &parse_status_)); |
| if (memcmp(header, AVRO_VERSION_HEADER, sizeof(AVRO_VERSION_HEADER))) { |
| return Status(TErrorCode::AVRO_BAD_VERSION_HEADER, |
| stream_->filename(), ReadWriteUtil::HexDump(header, sizeof(AVRO_VERSION_HEADER))); |
| } |
| |
| // Decode relevant metadata (encoded as Avro map) |
| RETURN_IF_ERROR(ParseMetadata()); |
| |
| // Read file sync marker |
| uint8_t* sync; |
| RETURN_IF_FALSE(stream_->ReadBytes(SYNC_HASH_SIZE, &sync, &parse_status_)); |
| memcpy(header_->sync, sync, SYNC_HASH_SIZE); |
| |
| header_->header_size = stream_->total_bytes_returned() - SYNC_HASH_SIZE; |
| |
| // Transfer ownership so the memory remains valid for subsequent scanners that process |
| // the data portions of the file. |
| scan_node_->TransferToScanNodePool(template_tuple_pool_.get()); |
| return Status::OK(); |
| } |
| |
| Status HdfsAvroScanner::ParseMetadata() { |
| header_->is_compressed = false; |
| header_->compression_type = THdfsCompression::NONE; |
| |
| int64_t num_entries; |
| RETURN_IF_FALSE(stream_->ReadZLong(&num_entries, &parse_status_)); |
| if (num_entries < 1) { |
| return Status(TErrorCode::AVRO_INVALID_METADATA_COUNT, stream_->filename(), |
| num_entries, stream_->file_offset()); |
| } |
| |
| while (num_entries != 0) { |
| DCHECK_GT(num_entries, 0); |
| for (int i = 0; i < num_entries; ++i) { |
| // Decode Avro string-type key |
| string key; |
| uint8_t* key_buf; |
| int64_t key_len; |
| RETURN_IF_FALSE(stream_->ReadZLong(&key_len, &parse_status_)); |
| if (key_len < 0) { |
| return Status(TErrorCode::AVRO_INVALID_LENGTH, stream_->filename(), key_len, |
| stream_->file_offset()); |
| } |
| RETURN_IF_FALSE(stream_->ReadBytes(key_len, &key_buf, &parse_status_)); |
| key = string(reinterpret_cast<char*>(key_buf), key_len); |
| |
| // Decode Avro bytes-type value |
| uint8_t* value; |
| int64_t value_len; |
| RETURN_IF_FALSE(stream_->ReadZLong(&value_len, &parse_status_)); |
| if (value_len < 0) { |
| return Status(TErrorCode::AVRO_INVALID_LENGTH, stream_->filename(), value_len, |
| stream_->file_offset()); |
| } |
| RETURN_IF_FALSE(stream_->ReadBytes(value_len, &value, &parse_status_)); |
| |
| if (key == AVRO_SCHEMA_KEY) { |
| avro_schema_t raw_file_schema; |
| int error = avro_schema_from_json_length( |
| reinterpret_cast<char*>(value), value_len, &raw_file_schema); |
| if (error != 0) { |
| stringstream ss; |
| ss << "Failed to parse file schema: " << avro_strerror(); |
| return Status(ss.str()); |
| } |
| AvroSchemaElement* file_schema = avro_header_->schema.get(); |
| RETURN_IF_ERROR(AvroSchemaElement::ConvertSchema(raw_file_schema, file_schema)); |
| |
| RETURN_IF_ERROR(ResolveSchemas(scan_node_->avro_schema(), file_schema)); |
| |
| // We currently codegen a function only for the table schema. If this file's |
| // schema is different from the table schema, don't use the codegen'd function and |
| // use the interpreted path instead. |
| avro_header_->use_codegend_decode_avro_data = avro_schema_equal( |
| scan_node_->avro_schema().schema, file_schema->schema); |
| |
| } else if (key == AVRO_CODEC_KEY) { |
| string avro_codec(reinterpret_cast<char*>(value), value_len); |
| if (avro_codec != AVRO_NULL_CODEC) { |
| header_->is_compressed = true; |
| // This scanner doesn't use header_->codec (Avro doesn't use the |
| // Hadoop codec strings), but fill it in for logging |
| header_->codec = avro_codec; |
| if (avro_codec == AVRO_SNAPPY_CODEC) { |
| header_->compression_type = THdfsCompression::SNAPPY; |
| } else if (avro_codec == AVRO_DEFLATE_CODEC) { |
| header_->compression_type = THdfsCompression::DEFLATE; |
| } else { |
| return Status("Unknown Avro compression codec: " + avro_codec); |
| } |
| } |
| } else { |
| VLOG_ROW << "Skipping metadata entry: " << key; |
| } |
| } |
| RETURN_IF_FALSE(stream_->ReadZLong(&num_entries, &parse_status_)); |
| if (num_entries < 0) { |
| return Status(TErrorCode::AVRO_INVALID_METADATA_COUNT, stream_->filename(), |
| num_entries, stream_->file_offset()); |
| } |
| } |
| |
| VLOG_FILE << stream_->filename() << ": " |
| << (header_->is_compressed ? "compressed" : "not compressed"); |
| if (header_->is_compressed) VLOG_FILE << header_->codec; |
| if (avro_header_->schema->children.empty()) { |
| return Status("Schema not found in file header metadata"); |
| } |
| return Status::OK(); |
| } |
| |
| // Schema resolution is performed per materialized slot (meaning we don't perform schema |
| // resolution for non-materialized columns). For each slot, we traverse the table schema |
| // using the column path (i.e., the traversal is by ordinal). We simultaneously traverse |
| // the file schema using the table schema's field names. The final field should exist in |
| // both schemas and be promotable to the slot type. If the file schema is missing a field, |
| // we check for a default value in the table schema and use that instead. |
| // TODO: test unresolvable schemas |
| // TODO: improve error messages |
| Status HdfsAvroScanner::ResolveSchemas(const AvroSchemaElement& table_root, |
| AvroSchemaElement* file_root) { |
| if (table_root.schema->type != AVRO_RECORD) return Status("Table schema is not a record"); |
| if (file_root->schema->type != AVRO_RECORD) return Status("File schema is not a record"); |
| |
| // Associate each slot descriptor with a field in the file schema, or fill in the |
| // template tuple with a default value from the table schema. |
| for (SlotDescriptor* slot_desc: scan_node_->materialized_slots()) { |
| // Traverse the column path, simultaneously traversing the table schema by ordinal and |
| // the file schema by field name from the table schema. |
| const SchemaPath& path = slot_desc->col_path(); |
| const AvroSchemaElement* table_record = &table_root; |
| AvroSchemaElement* file_record = file_root; |
| |
| for (int i = 0; i < path.size(); ++i) { |
| int table_field_idx = i > 0 ? path[i] : path[i] - scan_node_->num_partition_keys(); |
| int num_fields = table_record->children.size(); |
| if (table_field_idx >= num_fields) { |
| // TODO: add path to error message (and elsewhere) |
| return Status(TErrorCode::AVRO_MISSING_FIELD, table_field_idx, num_fields); |
| } |
| |
| const char* field_name = |
| avro_schema_record_field_name(table_record->schema, table_field_idx); |
| int file_field_idx = |
| avro_schema_record_field_get_index(file_record->schema, field_name); |
| |
| if (file_field_idx < 0) { |
| // This field doesn't exist in the file schema. Check if there is a default value. |
| avro_datum_t default_value = |
| avro_schema_record_field_default(table_record->schema, table_field_idx); |
| if (default_value == nullptr) { |
| return Status(TErrorCode::AVRO_MISSING_DEFAULT, field_name); |
| } |
| RETURN_IF_ERROR(WriteDefaultValue(slot_desc, default_value, field_name)); |
| DCHECK_EQ(i, path.size() - 1) << |
| "WriteDefaultValue() doesn't support default records yet, should have failed"; |
| continue; |
| } |
| |
| const AvroSchemaElement& table_field = table_record->children[table_field_idx]; |
| AvroSchemaElement& file_field = file_record->children[file_field_idx]; |
| RETURN_IF_ERROR(VerifyTypesMatch(table_field, file_field, field_name)); |
| |
| if (i != path.size() - 1) { |
| // All but the last index in 'path' should be a record field |
| if (table_record->schema->type != AVRO_RECORD) { |
| return Status(TErrorCode::AVRO_NOT_A_RECORD, field_name); |
| } else { |
| DCHECK_EQ(file_record->schema->type, AVRO_RECORD); |
| } |
| table_record = &table_field; |
| file_record = &file_field; |
| } else { |
| // This should be the field corresponding to 'slot_desc'. Check that slot_desc can |
| // be resolved to the table's Avro schema. |
| RETURN_IF_ERROR(VerifyTypesMatch(slot_desc, table_field.schema)); |
| file_field.slot_desc = slot_desc; |
| } |
| } |
| } |
| return Status::OK(); |
| } |
| |
| Status HdfsAvroScanner::WriteDefaultValue( |
| SlotDescriptor* slot_desc, avro_datum_t default_value, const char* field_name) { |
| if (avro_header_->template_tuple == nullptr) { |
| if (template_tuple_ != nullptr) { |
| avro_header_->template_tuple = template_tuple_; |
| } else { |
| avro_header_->template_tuple = |
| Tuple::Create(tuple_byte_size_, template_tuple_pool_.get()); |
| } |
| } |
| switch (default_value->type) { |
| case AVRO_BOOLEAN: { |
| // We don't call VerifyTypesMatch() above the switch statement so we don't want to |
| // call it in the default case (since we VerifyTypesMatch() can't handle every type |
| // either, and we want to return the correct error message). |
| RETURN_IF_ERROR(VerifyTypesMatch(slot_desc, default_value)); |
| int8_t v; |
| if (avro_boolean_get(default_value, &v)) DCHECK(false); |
| RawValue::Write(&v, avro_header_->template_tuple, slot_desc, nullptr); |
| break; |
| } |
| case AVRO_INT32: |
| case AVRO_DATE: { |
| RETURN_IF_ERROR(VerifyTypesMatch(slot_desc, default_value)); |
| int32_t v; |
| if (avro_int32_get(default_value, &v)) DCHECK(false); |
| RawValue::Write(&v, avro_header_->template_tuple, slot_desc, nullptr); |
| break; |
| } |
| case AVRO_INT64: { |
| RETURN_IF_ERROR(VerifyTypesMatch(slot_desc, default_value)); |
| int64_t v; |
| if (avro_int64_get(default_value, &v)) DCHECK(false); |
| RawValue::Write(&v, avro_header_->template_tuple, slot_desc, nullptr); |
| break; |
| } |
| case AVRO_FLOAT: { |
| RETURN_IF_ERROR(VerifyTypesMatch(slot_desc, default_value)); |
| float v; |
| if (avro_float_get(default_value, &v)) DCHECK(false); |
| RawValue::Write(&v, avro_header_->template_tuple, slot_desc, nullptr); |
| break; |
| } |
| case AVRO_DOUBLE: { |
| RETURN_IF_ERROR(VerifyTypesMatch(slot_desc, default_value)); |
| double v; |
| if (avro_double_get(default_value, &v)) DCHECK(false); |
| RawValue::Write(&v, avro_header_->template_tuple, slot_desc, nullptr); |
| break; |
| } |
| case AVRO_STRING: |
| case AVRO_BYTES: { |
| RETURN_IF_ERROR(VerifyTypesMatch(slot_desc, default_value)); |
| char* v; |
| if (avro_string_get(default_value, &v)) DCHECK(false); |
| StringValue sv(v); |
| RawValue::Write(&sv, avro_header_->template_tuple, slot_desc, |
| template_tuple_pool_.get()); |
| break; |
| } |
| case AVRO_NULL: |
| RETURN_IF_ERROR(VerifyTypesMatch(slot_desc, default_value)); |
| avro_header_->template_tuple->SetNull(slot_desc->null_indicator_offset()); |
| break; |
| default: |
| return Status(TErrorCode::AVRO_UNSUPPORTED_DEFAULT_VALUE, field_name, |
| avro_type_name(default_value->type)); |
| } |
| return Status::OK(); |
| } |
| |
| Status HdfsAvroScanner::VerifyTypesMatch(const AvroSchemaElement& table_schema, |
| const AvroSchemaElement& file_schema, const string& field_name) { |
| if (!table_schema.nullable() && file_schema.nullable()) { |
| // Use ErrorMsg because corresponding Status ctor is ambiguous |
| ErrorMsg msg(TErrorCode::AVRO_NULLABILITY_MISMATCH, field_name); |
| return Status(msg); |
| } |
| |
| if (file_schema.schema->type == AVRO_NULL) { |
| if (table_schema.schema->type == AVRO_NULL || table_schema.nullable()) { |
| return Status::OK(); |
| } else { |
| return Status(TErrorCode::AVRO_SCHEMA_RESOLUTION_ERROR, field_name, |
| avro_type_name(table_schema.schema->type), |
| avro_type_name(file_schema.schema->type)); |
| } |
| } |
| |
| // Can't convert records to ColumnTypes, check here instead of below |
| // TODO: update if/when we have TYPE_STRUCT primitive type |
| if ((table_schema.schema->type == AVRO_RECORD) ^ |
| (file_schema.schema->type == AVRO_RECORD)) { |
| return Status(TErrorCode::AVRO_SCHEMA_RESOLUTION_ERROR, field_name, |
| avro_type_name(table_schema.schema->type), |
| avro_type_name(file_schema.schema->type)); |
| } else if (table_schema.schema->type == AVRO_RECORD) { |
| DCHECK_EQ(file_schema.schema->type, AVRO_RECORD); |
| return Status::OK(); |
| } |
| |
| ColumnType reader_type; |
| RETURN_IF_ERROR(AvroSchemaToColumnType(table_schema.schema, field_name, &reader_type)); |
| ColumnType writer_type; |
| RETURN_IF_ERROR(AvroSchemaToColumnType(file_schema.schema, field_name, &writer_type)); |
| bool match = VerifyTypesMatch(reader_type, writer_type); |
| if (match) return Status::OK(); |
| return Status(TErrorCode::AVRO_SCHEMA_RESOLUTION_ERROR, field_name, |
| avro_type_name(table_schema.schema->type), |
| avro_type_name(file_schema.schema->type)); |
| } |
| |
| Status HdfsAvroScanner::VerifyTypesMatch(SlotDescriptor* slot_desc, avro_obj_t* schema) { |
| // TODO: make this work for nested fields |
| const string& col_name = |
| scan_node_->hdfs_table()->col_descs()[slot_desc->col_pos()].name(); |
| |
| // All Impala types are nullable |
| if (schema->type == AVRO_NULL) return Status::OK(); |
| |
| // Can't convert records to ColumnTypes, check here instead of below |
| // TODO: update if/when we have TYPE_STRUCT primitive type |
| if (schema->type == AVRO_RECORD) { |
| return Status(TErrorCode::AVRO_SCHEMA_METADATA_MISMATCH, col_name, |
| slot_desc->type().DebugString(), avro_type_name(schema->type)); |
| } |
| |
| ColumnType file_type; |
| RETURN_IF_ERROR(AvroSchemaToColumnType(schema, col_name, &file_type)); |
| bool match = VerifyTypesMatch(slot_desc->type(), file_type); |
| if (match) return Status::OK(); |
| return Status(TErrorCode::AVRO_SCHEMA_METADATA_MISMATCH, col_name, |
| slot_desc->type().DebugString(), avro_type_name(schema->type)); |
| } |
| |
| bool HdfsAvroScanner::VerifyTypesMatch( |
| const ColumnType& reader_type, const ColumnType& writer_type) { |
| switch (writer_type.type) { |
| case TYPE_DECIMAL: |
| if (reader_type.type != TYPE_DECIMAL) return false; |
| if (reader_type.scale != writer_type.scale) return false; |
| if (reader_type.precision != writer_type.precision) return false; |
| return true; |
| case TYPE_STRING: return reader_type.IsStringType(); |
| case TYPE_INT: |
| case TYPE_DATE: |
| switch(reader_type.type) { |
| case TYPE_INT: |
| case TYPE_DATE: |
| // Type promotion |
| case TYPE_BIGINT: |
| case TYPE_FLOAT: |
| case TYPE_DOUBLE: |
| return true; |
| default: |
| return false; |
| } |
| case TYPE_BIGINT: |
| switch(reader_type.type) { |
| case TYPE_BIGINT: |
| // Type promotion |
| case TYPE_FLOAT: |
| case TYPE_DOUBLE: |
| return true; |
| default: |
| return false; |
| } |
| case TYPE_FLOAT: |
| switch(reader_type.type) { |
| case TYPE_FLOAT: |
| // Type promotion |
| case TYPE_DOUBLE: |
| return true; |
| default: |
| return false; |
| } |
| case TYPE_DOUBLE: return reader_type.type == TYPE_DOUBLE; |
| case TYPE_BOOLEAN: return reader_type.type == TYPE_BOOLEAN; |
| default: |
| DCHECK(false) << "NYI: " << writer_type.DebugString(); |
| return false; |
| } |
| } |
| |
| Status HdfsAvroScanner::InitNewRange() { |
| DCHECK(header_ != nullptr); |
| only_parsing_header_ = false; |
| avro_header_ = reinterpret_cast<AvroFileHeader*>(header_); |
| template_tuple_ = avro_header_->template_tuple; |
| if (header_->is_compressed) { |
| RETURN_IF_ERROR(UpdateDecompressor(header_->compression_type)); |
| } |
| |
| if (avro_header_->use_codegend_decode_avro_data) { |
| codegend_decode_avro_data_ = reinterpret_cast<DecodeAvroDataFn>( |
| scan_node_->GetCodegenFn(THdfsFileFormat::AVRO)); |
| } |
| if (codegend_decode_avro_data_ == nullptr) { |
| scan_node_->IncNumScannersCodegenDisabled(); |
| } else { |
| VLOG(2) << "HdfsAvroScanner (node_id=" << scan_node_->id() |
| << ") using llvm codegend functions."; |
| scan_node_->IncNumScannersCodegenEnabled(); |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status HdfsAvroScanner::ProcessRange(RowBatch* row_batch) { |
| // Process blocks until we hit eos, the limit or the batch fills up. Check |
| // AtCapacity() at the end of the loop to guarantee that we process at least one row |
| // so that we make progress even if the batch starts off with AtCapacity() == true, |
| // which can happen if the tuple buffer is > 8MB. |
| DCHECK_GT(row_batch->capacity(), row_batch->num_rows()); |
| while (!eos_ && !scan_node_->ReachedLimitShared()) { |
| if (record_pos_ == num_records_in_block_) { |
| // Read new data block |
| RETURN_IF_FALSE(stream_->ReadZLong(&num_records_in_block_, &parse_status_)); |
| if (num_records_in_block_ < 0) { |
| return Status(TErrorCode::AVRO_INVALID_RECORD_COUNT, stream_->filename(), |
| num_records_in_block_, stream_->file_offset()); |
| } |
| int64_t compressed_size; |
| RETURN_IF_FALSE(stream_->ReadZLong(&compressed_size, &parse_status_)); |
| if (compressed_size < 0) { |
| return Status(TErrorCode::AVRO_INVALID_COMPRESSED_SIZE, stream_->filename(), |
| compressed_size, stream_->file_offset()); |
| } |
| uint8_t* compressed_data; |
| RETURN_IF_FALSE(stream_->ReadBytes( |
| compressed_size, &compressed_data, &parse_status_)); |
| |
| if (header_->is_compressed) { |
| if (header_->compression_type == THdfsCompression::SNAPPY) { |
| // Snappy-compressed data block includes trailing 4-byte checksum, |
| // decompressor_ doesn't expect this |
| compressed_size -= SnappyDecompressor::TRAILING_CHECKSUM_LEN; |
| } |
| SCOPED_TIMER(decompress_timer_); |
| RETURN_IF_ERROR(decompressor_->ProcessBlock(false, compressed_size, |
| compressed_data, &data_block_len_, &data_block_)); |
| } else { |
| data_block_ = compressed_data; |
| data_block_len_ = compressed_size; |
| } |
| data_block_end_ = data_block_ + data_block_len_; |
| record_pos_ = 0; |
| } |
| |
| int64_t prev_record_pos = record_pos_; |
| int block_start_row = row_batch->num_rows(); |
| // Process the remaining data in the current block. Always process at least one row |
| // to ensure we make progress even if the batch starts off with AtCapacity() == true. |
| DCHECK_GT(row_batch->capacity(), row_batch->num_rows()); |
| while (record_pos_ != num_records_in_block_) { |
| SCOPED_TIMER(scan_node_->materialize_tuple_timer()); |
| |
| Tuple* tuple = tuple_; |
| TupleRow* tuple_row = row_batch->GetRow(row_batch->AddRow()); |
| int max_tuples = row_batch->capacity() - row_batch->num_rows(); |
| max_tuples = min<int64_t>(max_tuples, num_records_in_block_ - record_pos_); |
| int num_to_commit; |
| if (scan_node_->materialized_slots().empty()) { |
| // No slots to materialize (e.g. count(*)), no need to decode data |
| num_to_commit = WriteTemplateTuples(tuple_row, max_tuples); |
| } else if (codegend_decode_avro_data_ != nullptr) { |
| num_to_commit = codegend_decode_avro_data_(this, max_tuples, |
| row_batch->tuple_data_pool(), &data_block_, data_block_end_, tuple, tuple_row); |
| } else { |
| num_to_commit = DecodeAvroData(max_tuples, row_batch->tuple_data_pool(), |
| &data_block_, data_block_end_, tuple, tuple_row); |
| } |
| RETURN_IF_ERROR(parse_status_); |
| RETURN_IF_ERROR(CommitRows(num_to_commit, row_batch)); |
| record_pos_ += max_tuples; |
| COUNTER_ADD(scan_node_->rows_read_counter(), max_tuples); |
| if (row_batch->AtCapacity() || scan_node_->ReachedLimitShared()) break; |
| } |
| |
| if (record_pos_ == num_records_in_block_) { |
| if (decompressor_.get() != nullptr && !decompressor_->reuse_output_buffer()) { |
| if (prev_record_pos == 0 && row_batch->num_rows() == block_start_row) { |
| // Did not return any rows from current block in this or a previous |
| // ProcessRange() call - we can recycle the memory. This optimisation depends on |
| // passing keep_current_chunk = false to the AcquireData() call below, so that |
| // the current chunk only contains data for the current Avro block. |
| data_buffer_pool_->Clear(); |
| } else { |
| // Returned rows may reference data buffers - need to attach to batch. |
| row_batch->tuple_data_pool()->AcquireData(data_buffer_pool_.get(), false); |
| } |
| } |
| RETURN_IF_ERROR(ReadSync()); |
| } |
| if (row_batch->AtCapacity()) break; |
| } |
| return Status::OK(); |
| } |
| |
| bool HdfsAvroScanner::MaterializeTuple(const AvroSchemaElement& record_schema, |
| MemPool* pool, uint8_t** data, uint8_t* data_end, Tuple* tuple) { |
| DCHECK_EQ(record_schema.schema->type, AVRO_RECORD); |
| for (const AvroSchemaElement& element: record_schema.children) { |
| DCHECK_LE(*data, data_end); |
| |
| const SlotDescriptor* slot_desc = element.slot_desc; |
| bool write_slot = false; |
| void* slot = nullptr; |
| PrimitiveType slot_type = INVALID_TYPE; |
| if (slot_desc != nullptr) { |
| write_slot = true; |
| slot = tuple->GetSlot(slot_desc->tuple_offset()); |
| slot_type = slot_desc->type().type; |
| } |
| |
| avro_type_t type = element.schema->type; |
| if (element.nullable()) { |
| bool is_null; |
| if (!ReadUnionType(element.null_union_position, data, data_end, &is_null)) { |
| return false; |
| } |
| if (is_null) type = AVRO_NULL; |
| } |
| |
| bool success; |
| switch (type) { |
| case AVRO_NULL: |
| if (slot_desc != nullptr) tuple->SetNull(slot_desc->null_indicator_offset()); |
| success = true; |
| break; |
| case AVRO_BOOLEAN: |
| success = ReadAvroBoolean(slot_type, data, data_end, write_slot, slot, pool); |
| break; |
| case AVRO_DATE: |
| case AVRO_INT32: |
| if (slot_type == TYPE_DATE) { |
| success = ReadAvroDate(slot_type, data, data_end, write_slot, slot, pool); |
| } else { |
| success = ReadAvroInt32(slot_type, data, data_end, write_slot, slot, pool); |
| } |
| break; |
| case AVRO_INT64: |
| success = ReadAvroInt64(slot_type, data, data_end, write_slot, slot, pool); |
| break; |
| case AVRO_FLOAT: |
| success = ReadAvroFloat(slot_type, data, data_end, write_slot, slot, pool); |
| break; |
| case AVRO_DOUBLE: |
| success = ReadAvroDouble(slot_type, data, data_end, write_slot, slot, pool); |
| break; |
| case AVRO_STRING: |
| case AVRO_BYTES: |
| if (slot_desc != nullptr && slot_desc->type().type == TYPE_VARCHAR) { |
| success = ReadAvroVarchar(slot_type, slot_desc->type().len, data, data_end, |
| write_slot, slot, pool); |
| } else if (slot_desc != nullptr && slot_desc->type().type == TYPE_CHAR) { |
| success = ReadAvroChar(slot_type, slot_desc->type().len, data, data_end, |
| write_slot, slot, pool); |
| } else { |
| success = ReadAvroString(slot_type, data, data_end, write_slot, slot, pool); |
| } |
| break; |
| case AVRO_DECIMAL: { |
| int slot_byte_size = 0; |
| if (slot_desc != nullptr) { |
| DCHECK_EQ(slot_type, TYPE_DECIMAL); |
| slot_byte_size = slot_desc->type().GetByteSize(); |
| } |
| success = ReadAvroDecimal(slot_byte_size, data, data_end, write_slot, slot, pool); |
| break; |
| } |
| case AVRO_RECORD: |
| success = MaterializeTuple(element, pool, data, data_end, tuple); |
| break; |
| default: |
| success = false; |
| DCHECK(false) << "Unsupported SchemaElement: " << type; |
| } |
| if (UNLIKELY(!success)) { |
| DCHECK(!parse_status_.ok()); |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| void HdfsAvroScanner::SetStatusCorruptData(TErrorCode::type error_code) { |
| DCHECK(parse_status_.ok()); |
| if (TestInfo::is_test()) { |
| parse_status_ = Status(error_code, "test file", 123); |
| } else { |
| parse_status_ = Status(error_code, stream_->filename(), stream_->file_offset()); |
| } |
| } |
| |
| void HdfsAvroScanner::SetStatusInvalidValue(TErrorCode::type error_code, int64_t len) { |
| DCHECK(parse_status_.ok()); |
| if (TestInfo::is_test()) { |
| parse_status_ = Status(error_code, "test file", len, 123); |
| } else { |
| parse_status_ = Status(error_code, stream_->filename(), len, stream_->file_offset()); |
| } |
| } |
| |
| void HdfsAvroScanner::SetStatusValueOverflow(TErrorCode::type error_code, int64_t len, |
| int64_t limit) { |
| DCHECK(parse_status_.ok()); |
| if (TestInfo::is_test()) { |
| parse_status_ = Status(error_code, "test file", len, limit, 123); |
| } else { |
| parse_status_ = Status(error_code, stream_->filename(), len, limit, |
| stream_->file_offset()); |
| } |
| } |
| |
| // This function produces a codegen'd function equivalent to MaterializeTuple() but |
| // optimized for the table schema. Via helper functions CodegenReadRecord() and |
| // CodegenReadScalar(), it eliminates the conditionals necessary when interpreting the |
| // type of each element in the schema, instead generating code to handle each element in |
| // the schema. |
| // |
| // To avoid overly long codegen times for wide schemas, this function generates |
| // one function per 200 columns, and a function that calls them all together. |
| // |
| // |
| // Example output with 'select count(*) from tpch_avro.region': |
| // |
| // define i1 @MaterializeTuple-helper0(%"class.impala::HdfsAvroScanner"* %this, %"struct.impala::AvroSchemaElement"* %record_schema, %"class.impala::MemPool"* %pool, i8** %data, i8* %data_end, %"class.impala::Tuple"* %tuple) #34 { |
| // entry: |
| // %is_null_ptr = alloca i1 |
| // %tuple_ptr = bitcast %"class.impala::Tuple"* %tuple to <{}>* |
| // %0 = bitcast i1* %is_null_ptr to i8* |
| // %read_union_ok = call i1 @_ZN6impala15HdfsAvroScanner13ReadUnionTypeEiPPhS1_Pb(%"class.impala::HdfsAvroScanner"* %this, i32 1, i8** %data, i8* %data_end, i8* %0) |
| // br i1 %read_union_ok, label %read_union_ok1, label %bail_out |
| // |
| // read_union_ok1: ; preds = %entry |
| // %is_null = load i1, i1* %is_null_ptr |
| // br i1 %is_null, label %null_field, label %read_field |
| // |
| // read_field: ; preds = %read_union_ok1 |
| // %success = call i1 @_ZN6impala15HdfsAvroScanner13ReadAvroInt32ENS_13PrimitiveTypeEPPhS2_bPvPNS_7MemPoolE(%"class.impala::HdfsAvroScanner"* %this, i32 0, i8** %data, i8* %data_end, i1 false, i8* null, %"class.impala::MemPool"* %pool) |
| // br i1 %success, label %end_field, label %bail_out |
| // |
| // null_field: ; preds = %read_union_ok1 |
| // br label %end_field |
| // |
| // end_field: ; preds = %read_field, %null_field |
| // %1 = bitcast i1* %is_null_ptr to i8* |
| // %read_union_ok4 = call i1 @_ZN6impala15HdfsAvroScanner13ReadUnionTypeEiPPhS1_Pb(%"class.impala::HdfsAvroScanner"* %this, i32 1, i8** %data, i8* %data_end, i8* %1) |
| // br i1 %read_union_ok4, label %read_union_ok5, label %bail_out |
| // |
| // read_union_ok5: ; preds = %end_field |
| // %is_null7 = load i1, i1* %is_null_ptr |
| // br i1 %is_null7, label %null_field6, label %read_field2 |
| // |
| // read_field2: ; preds = %read_union_ok5 |
| // %success8 = call i1 @_ZN6impala15HdfsAvroScanner14ReadAvroStringENS_13PrimitiveTypeEPPhS2_bPvPNS_7MemPoolE(%"class.impala::HdfsAvroScanner"* %this, i32 0, i8** %data, i8* %data_end, i1 false, i8* null, %"class.impala::MemPool"* %pool) |
| // br i1 %success8, label %end_field3, label %bail_out |
| // |
| // null_field6: ; preds = %read_union_ok5 |
| // br label %end_field3 |
| // |
| // end_field3: ; preds = %read_field2, %null_field6 |
| // %2 = bitcast i1* %is_null_ptr to i8* |
| // %read_union_ok11 = call i1 @_ZN6impala15HdfsAvroScanner13ReadUnionTypeEiPPhS1_Pb(%"class.impala::HdfsAvroScanner"* %this, i32 1, i8** %data, i8* %data_end, i8* %2) |
| // br i1 %read_union_ok11, label %read_union_ok12, label %bail_out |
| // |
| // read_union_ok12: ; preds = %end_field3 |
| // %is_null14 = load i1, i1* %is_null_ptr |
| // br i1 %is_null14, label %null_field13, label %read_field9 |
| // |
| // read_field9: ; preds = %read_union_ok12 |
| // %success15 = call i1 @_ZN6impala15HdfsAvroScanner14ReadAvroStringENS_13PrimitiveTypeEPPhS2_bPvPNS_7MemPoolE(%"class.impala::HdfsAvroScanner"* %this, i32 0, i8** %data, i8* %data_end, i1 false, i8* null, %"class.impala::MemPool"* %pool) |
| // br i1 %success15, label %end_field10, label %bail_out |
| // |
| // null_field13: ; preds = %read_union_ok12 |
| // br label %end_field10 |
| // |
| // end_field10: ; preds = %read_field9, %null_field13 |
| // ret i1 true |
| // |
| // bail_out: ; preds = %read_field9, %end_field3, %read_field2, %end_field, %read_field, %entry |
| // ret i1 false |
| // } |
| // |
| // define i1 @MaterializeTuple(%"class.impala::HdfsAvroScanner"* %this, %"struct.impala::AvroSchemaElement"* %record_schema, %"class.impala::MemPool"* %pool, i8** %data, i8* %data_end, %"class.impala::Tuple"* %tuple) #34 { |
| // entry: |
| // %helper_01 = call i1 @MaterializeTuple-helper0(%"class.impala::HdfsAvroScanner"* %this, %"struct.impala::AvroSchemaElement"* %record_schema, %"class.impala::MemPool"* %pool, i8** %data, i8* %data_end, %"class.impala::Tuple"* %tuple) |
| // br i1 %helper_01, label %helper_0, label %bail_out |
| // |
| // helper_0: ; preds = %entry |
| // ret i1 true |
| // |
| // bail_out: ; preds = %entry |
| // ret i1 false |
| // } |
| Status HdfsAvroScanner::CodegenMaterializeTuple(const HdfsScanNodeBase* node, |
| LlvmCodeGen* codegen, llvm::Function** materialize_tuple_fn) { |
| llvm::LLVMContext& context = codegen->context(); |
| LlvmBuilder builder(context); |
| |
| llvm::PointerType* this_ptr_type = codegen->GetStructPtrType<HdfsAvroScanner>(); |
| |
| TupleDescriptor* tuple_desc = const_cast<TupleDescriptor*>(node->tuple_desc()); |
| llvm::StructType* tuple_type = tuple_desc->GetLlvmStruct(codegen); |
| if (tuple_type == nullptr) return Status("Could not generate tuple struct."); |
| llvm::Type* tuple_ptr_type = llvm::PointerType::get(tuple_type, 0); |
| |
| llvm::PointerType* tuple_opaque_ptr_type = codegen->GetStructPtrType<Tuple>(); |
| |
| llvm::Type* data_ptr_type = codegen->ptr_ptr_type(); // char** |
| llvm::Type* mempool_type = codegen->GetStructPtrType<MemPool>(); |
| llvm::Type* schema_element_type = codegen->GetStructPtrType<AvroSchemaElement>(); |
| |
| // Schema can be null if metadata is stale. See test in |
| // queries/QueryTest/avro-schema-changes.test. |
| RETURN_IF_ERROR(CheckSchema(node->avro_schema())); |
| int num_children = node->avro_schema().children.size(); |
| if (num_children == 0) { |
| return Status("Invalid Avro record schema: contains no children."); |
| } |
| // We handle step_size columns per function. This minimizes LLVM |
| // optimization time and was determined empirically. If there are |
| // too many functions, it takes LLVM longer to optimize. If the functions |
| // are too long, it takes LLVM longer too. |
| int step_size = 200; |
| std::vector<llvm::Function*> helper_functions; |
| |
| // prototype re-used several times by amending with SetName() |
| LlvmCodeGen::FnPrototype prototype(codegen, "", codegen->bool_type()); |
| prototype.AddArgument(LlvmCodeGen::NamedVariable("this", this_ptr_type)); |
| prototype.AddArgument(LlvmCodeGen::NamedVariable("record_schema", schema_element_type)); |
| prototype.AddArgument(LlvmCodeGen::NamedVariable("pool", mempool_type)); |
| prototype.AddArgument(LlvmCodeGen::NamedVariable("data", data_ptr_type)); |
| prototype.AddArgument(LlvmCodeGen::NamedVariable("data_end", codegen->ptr_type())); |
| prototype.AddArgument(LlvmCodeGen::NamedVariable("tuple", tuple_opaque_ptr_type)); |
| |
| // Generate helper functions for every step_size columns. |
| for (int i = 0; i < num_children; i += step_size) { |
| prototype.SetName("MaterializeTuple-helper" + std::to_string(i)); |
| llvm::Value* args[6]; |
| llvm::Function* helper_fn = prototype.GeneratePrototype(&builder, args); |
| |
| llvm::Value* this_val = args[0]; |
| // llvm::Value* record_schema_val = args[1]; // don't need this |
| llvm::Value* pool_val = args[2]; |
| llvm::Value* data_val = args[3]; |
| llvm::Value* data_end_val = args[4]; |
| llvm::Value* opaque_tuple_val = args[5]; |
| |
| llvm::Value* tuple_val = |
| builder.CreateBitCast(opaque_tuple_val, tuple_ptr_type, "tuple_ptr"); |
| |
| // Create a bail out block to handle decoding failures. |
| llvm::BasicBlock* bail_out_block = |
| llvm::BasicBlock::Create(context, "bail_out", helper_fn, nullptr); |
| |
| Status status = CodegenReadRecord( |
| SchemaPath(), node->avro_schema(), i, std::min(num_children, i + step_size), |
| node, codegen, &builder, helper_fn, bail_out_block, |
| bail_out_block, this_val, pool_val, tuple_val, data_val, data_end_val); |
| if (!status.ok()) { |
| VLOG_QUERY << status.GetDetail(); |
| return status; |
| } |
| |
| // Returns true on successful decoding. |
| builder.CreateRet(codegen->true_value()); |
| |
| // Returns false on decoding errors. |
| builder.SetInsertPoint(bail_out_block); |
| builder.CreateRet(codegen->false_value()); |
| |
| if (codegen->FinalizeFunction(helper_fn) == nullptr) { |
| return Status("Failed to finalize helper_fn."); |
| } |
| helper_functions.push_back(helper_fn); |
| } |
| |
| // Actual MaterializeTuple. Call all the helper functions. |
| { |
| llvm::Value* args[6]; |
| prototype.SetName("MaterializeTuple"); |
| llvm::Function* fn = prototype.GeneratePrototype(&builder, args); |
| |
| // These are the blocks that we go to after the helper runs. |
| std::vector<llvm::BasicBlock*> helper_blocks; |
| for (int i = 0; i < helper_functions.size(); ++i) { |
| llvm::BasicBlock* helper_block = |
| llvm::BasicBlock::Create(context, "helper_" + std::to_string(i), fn, nullptr); |
| helper_blocks.push_back(helper_block); |
| } |
| |
| // Block for failures |
| llvm::BasicBlock* bail_out_block = |
| llvm::BasicBlock::Create(context, "bail_out", fn, nullptr); |
| |
| // Call the helpers. |
| for (int i = 0; i < helper_functions.size(); ++i) { |
| if (i != 0) builder.SetInsertPoint(helper_blocks[i - 1]); |
| llvm::Function* fnHelper = helper_functions[i]; |
| llvm::Value* helper_ok = |
| builder.CreateCall(fnHelper, args, "helper_" + std::to_string(i)); |
| builder.CreateCondBr(helper_ok, helper_blocks[i], bail_out_block); |
| } |
| |
| // Return false on errors |
| builder.SetInsertPoint(bail_out_block); |
| builder.CreateRet(codegen->false_value()); |
| |
| // And true on success |
| builder.SetInsertPoint(helper_blocks[helper_blocks.size() - 1]); |
| builder.CreateRet(codegen->true_value()); |
| |
| *materialize_tuple_fn = codegen->FinalizeFunction(fn); |
| if (*materialize_tuple_fn == nullptr) { |
| return Status("Failed to finalize materialize_tuple_fn."); |
| } |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status HdfsAvroScanner::CodegenReadRecord(const SchemaPath& path, |
| const AvroSchemaElement& record, int child_start, int child_end, |
| const HdfsScanNodeBase* node, LlvmCodeGen* codegen, void* void_builder, |
| llvm::Function* fn, llvm::BasicBlock* insert_before, llvm::BasicBlock* bail_out, |
| llvm::Value* this_val, llvm::Value* pool_val, llvm::Value* tuple_val, |
| llvm::Value* data_val, llvm::Value* data_end_val) { |
| RETURN_IF_ERROR(CheckSchema(record)); |
| DCHECK_EQ(record.schema->type, AVRO_RECORD); |
| llvm::LLVMContext& context = codegen->context(); |
| LlvmBuilder* builder = reinterpret_cast<LlvmBuilder*>(void_builder); |
| |
| // Codegen logic for parsing each field and, if necessary, populating a slot with the |
| // result. |
| |
| // Used to store result of ReadUnionType() call |
| llvm::Value* is_null_ptr = nullptr; |
| for (int i = child_start; i < child_end; ++i) { |
| const AvroSchemaElement* field = &record.children[i]; |
| int col_idx = i; |
| // If we're about to process the table-level columns, account for the partition keys |
| // when constructing 'path' |
| if (path.empty()) col_idx += node->num_partition_keys(); |
| SchemaPath new_path = path; |
| new_path.push_back(col_idx); |
| int slot_idx = node->GetMaterializedSlotIdx(new_path); |
| SlotDescriptor* slot_desc = (slot_idx == HdfsScanNodeBase::SKIP_COLUMN) ? |
| nullptr : node->materialized_slots()[slot_idx]; |
| |
| // Block that calls appropriate Read<Type> function |
| llvm::BasicBlock* read_field_block = |
| llvm::BasicBlock::Create(context, "read_field", fn, insert_before); |
| |
| // Block that handles a nullptr value. We fill this in below if the field is nullable, |
| // otherwise we leave this block nullptr. |
| llvm::BasicBlock* null_block = nullptr; |
| |
| // This is where we should end up after we're finished processing this field. Used to |
| // put the builder in the right place for the next field. |
| llvm::BasicBlock* end_field_block = |
| llvm::BasicBlock::Create(context, "end_field", fn, insert_before); |
| |
| if (field->nullable()) { |
| // Field could be null. Create conditional branch based on ReadUnionType result. |
| llvm::Function* read_union_fn = |
| codegen->GetFunction(IRFunction::READ_UNION_TYPE, false); |
| llvm::Value* null_union_pos_val = |
| codegen->GetI32Constant(field->null_union_position); |
| if (is_null_ptr == nullptr) { |
| is_null_ptr = codegen->CreateEntryBlockAlloca(*builder, codegen->bool_type(), |
| "is_null_ptr"); |
| } |
| llvm::Value* is_null_ptr_cast = |
| builder->CreateBitCast(is_null_ptr, codegen->ptr_type()); |
| llvm::Value* read_union_ok = builder->CreateCall(read_union_fn, |
| llvm::ArrayRef<llvm::Value*>( |
| {this_val, null_union_pos_val, data_val, data_end_val, is_null_ptr_cast}), |
| "read_union_ok"); |
| llvm::BasicBlock* read_union_ok_block = |
| llvm::BasicBlock::Create(context, "read_union_ok", fn, read_field_block); |
| builder->CreateCondBr(read_union_ok, read_union_ok_block, bail_out); |
| |
| builder->SetInsertPoint(read_union_ok_block); |
| null_block = llvm::BasicBlock::Create(context, "null_field", fn, end_field_block); |
| llvm::Value* is_null = builder->CreateLoad(is_null_ptr, "is_null"); |
| builder->CreateCondBr(is_null, null_block, read_field_block); |
| |
| // Write null field IR |
| builder->SetInsertPoint(null_block); |
| if (slot_idx != HdfsScanNodeBase::SKIP_COLUMN) { |
| slot_desc->CodegenSetNullIndicator( |
| codegen, builder, tuple_val, codegen->true_value()); |
| } |
| // LLVM requires all basic blocks to end with a terminating instruction |
| builder->CreateBr(end_field_block); |
| } else { |
| // Field is never null, read field unconditionally. |
| builder->CreateBr(read_field_block); |
| } |
| |
| // Write read_field_block IR |
| builder->SetInsertPoint(read_field_block); |
| llvm::Value* ret_val = nullptr; |
| if (field->schema->type == AVRO_RECORD) { |
| llvm::BasicBlock* insert_before_block = |
| (null_block != nullptr) ? null_block : end_field_block; |
| RETURN_IF_ERROR(CodegenReadRecord(new_path, *field, 0, field->children.size(), |
| node, codegen, builder, fn, |
| insert_before_block, bail_out, this_val, pool_val, tuple_val, data_val, |
| data_end_val)); |
| } else { |
| RETURN_IF_ERROR(CodegenReadScalar(*field, slot_desc, codegen, builder, |
| this_val, pool_val, tuple_val, data_val, data_end_val, &ret_val)); |
| } |
| builder->CreateCondBr(ret_val, end_field_block, bail_out); |
| |
| // Set insertion point for next field. |
| builder->SetInsertPoint(end_field_block); |
| } |
| return Status::OK(); |
| } |
| |
| Status HdfsAvroScanner::CodegenReadScalar(const AvroSchemaElement& element, |
| SlotDescriptor* slot_desc, LlvmCodeGen* codegen, void* void_builder, |
| llvm::Value* this_val, llvm::Value* pool_val, llvm::Value* tuple_val, |
| llvm::Value* data_val, llvm::Value* data_end_val, llvm::Value** ret_val) { |
| LlvmBuilder* builder = reinterpret_cast<LlvmBuilder*>(void_builder); |
| llvm::Function* read_field_fn; |
| switch (element.schema->type) { |
| case AVRO_BOOLEAN: |
| read_field_fn = codegen->GetFunction(IRFunction::READ_AVRO_BOOLEAN, false); |
| break; |
| case AVRO_DATE: |
| if (slot_desc != nullptr && slot_desc->type().type == TYPE_INT) { |
| read_field_fn = codegen->GetFunction(IRFunction::READ_AVRO_INT32, false); |
| } else { |
| read_field_fn = codegen->GetFunction(IRFunction::READ_AVRO_DATE, false); |
| } |
| break; |
| case AVRO_INT32: |
| if (slot_desc != nullptr && slot_desc->type().type == TYPE_DATE) { |
| read_field_fn = codegen->GetFunction(IRFunction::READ_AVRO_DATE, false); |
| } else { |
| read_field_fn = codegen->GetFunction(IRFunction::READ_AVRO_INT32, false); |
| } |
| break; |
| case AVRO_INT64: |
| read_field_fn = codegen->GetFunction(IRFunction::READ_AVRO_INT64, false); |
| break; |
| case AVRO_FLOAT: |
| read_field_fn = codegen->GetFunction(IRFunction::READ_AVRO_FLOAT, false); |
| break; |
| case AVRO_DOUBLE: |
| read_field_fn = codegen->GetFunction(IRFunction::READ_AVRO_DOUBLE, false); |
| break; |
| case AVRO_STRING: |
| case AVRO_BYTES: |
| if (slot_desc != nullptr && slot_desc->type().type == TYPE_VARCHAR) { |
| read_field_fn = codegen->GetFunction(IRFunction::READ_AVRO_VARCHAR, false); |
| } else if (slot_desc != nullptr && slot_desc->type().type == TYPE_CHAR) { |
| read_field_fn = codegen->GetFunction(IRFunction::READ_AVRO_CHAR, false); |
| } else { |
| read_field_fn = codegen->GetFunction(IRFunction::READ_AVRO_STRING, false); |
| } |
| break; |
| case AVRO_DECIMAL: |
| read_field_fn = codegen->GetFunction(IRFunction::READ_AVRO_DECIMAL, false); |
| break; |
| default: |
| return Status::Expected(Substitute( |
| "Failed to codegen MaterializeTuple() due to unsupported type: $0", |
| element.schema->type)); |
| } |
| |
| // Call appropriate ReadAvro<Type> function |
| llvm::Value* write_slot_val = builder->getFalse(); |
| llvm::Value* slot_type_val = builder->getInt32(0); |
| llvm::Value* opaque_slot_val = codegen->null_ptr_value(); |
| if (slot_desc != nullptr) { |
| // Field corresponds to a materialized column, fill in relevant arguments |
| write_slot_val = builder->getTrue(); |
| if (slot_desc->type().type == TYPE_DECIMAL) { |
| // ReadAvroDecimal() takes slot byte size instead of slot type |
| slot_type_val = builder->getInt32(slot_desc->type().GetByteSize()); |
| } else { |
| slot_type_val = builder->getInt32(slot_desc->type().type); |
| } |
| llvm::Value* slot_val = |
| builder->CreateStructGEP(nullptr, tuple_val, slot_desc->llvm_field_idx(), "slot"); |
| opaque_slot_val = |
| builder->CreateBitCast(slot_val, codegen->ptr_type(), "opaque_slot"); |
| } |
| |
| // NOTE: ReadAvroVarchar/Char has different signature than rest of read functions |
| if (slot_desc != nullptr && |
| (slot_desc->type().type == TYPE_VARCHAR || slot_desc->type().type == TYPE_CHAR)) { |
| // Need to pass an extra argument (the length) to the codegen function. |
| llvm::Value* fixed_len = builder->getInt32(slot_desc->type().len); |
| llvm::Value* read_field_args[] = {this_val, slot_type_val, fixed_len, data_val, |
| data_end_val, write_slot_val, opaque_slot_val, pool_val}; |
| *ret_val = builder->CreateCall(read_field_fn, read_field_args, "success"); |
| } else { |
| llvm::Value* read_field_args[] = {this_val, slot_type_val, data_val, data_end_val, |
| write_slot_val, opaque_slot_val, pool_val}; |
| *ret_val = builder->CreateCall(read_field_fn, read_field_args, "success"); |
| } |
| return Status::OK(); |
| } |
| |
| Status HdfsAvroScanner::CodegenDecodeAvroData(const HdfsScanNodeBase* node, |
| LlvmCodeGen* codegen, const vector<ScalarExpr*>& conjuncts, |
| llvm::Function** decode_avro_data_fn) { |
| llvm::Function* materialize_tuple_fn; |
| RETURN_IF_ERROR(CodegenMaterializeTuple(node, codegen, &materialize_tuple_fn)); |
| DCHECK(materialize_tuple_fn != nullptr); |
| |
| llvm::Function* fn = codegen->GetFunction(IRFunction::DECODE_AVRO_DATA, true); |
| |
| llvm::Function* init_tuple_fn; |
| RETURN_IF_ERROR(CodegenInitTuple(node, codegen, &init_tuple_fn)); |
| int replaced = codegen->ReplaceCallSites(fn, init_tuple_fn, "InitTuple"); |
| DCHECK_REPLACE_COUNT(replaced, 1); |
| |
| replaced = codegen->ReplaceCallSites(fn, materialize_tuple_fn, "MaterializeTuple"); |
| DCHECK_REPLACE_COUNT(replaced, 1); |
| |
| llvm::Function* eval_conjuncts_fn; |
| RETURN_IF_ERROR(ExecNode::CodegenEvalConjuncts(codegen, conjuncts, &eval_conjuncts_fn)); |
| |
| replaced = codegen->ReplaceCallSites(fn, eval_conjuncts_fn, "EvalConjuncts"); |
| DCHECK_REPLACE_COUNT(replaced, 1); |
| |
| llvm::Function* copy_strings_fn; |
| RETURN_IF_ERROR(Tuple::CodegenCopyStrings( |
| codegen, *node->tuple_desc(), ©_strings_fn)); |
| replaced = codegen->ReplaceCallSites(fn, copy_strings_fn, "CopyStrings"); |
| DCHECK_REPLACE_COUNT(replaced, 1); |
| |
| int tuple_byte_size = node->tuple_desc()->byte_size(); |
| replaced = codegen->ReplaceCallSitesWithValue(fn, |
| codegen->GetI32Constant(tuple_byte_size), "tuple_byte_size"); |
| DCHECK_REPLACE_COUNT(replaced, 1); |
| |
| fn->setName("DecodeAvroData"); |
| *decode_avro_data_fn = codegen->FinalizeFunction(fn); |
| if (*decode_avro_data_fn == nullptr) { |
| return Status("Failed to finalize decode_avro_data_fn."); |
| } |
| return Status::OK(); |
| } |