blob: 8c82675611a6d4af628b07a070f7e1dfb68a4f6b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/hdfs-avro-scanner.h"
#include <avro/errors.h>
#include <algorithm> // std::min
#include <avro/legacy.h>
#include <gutil/strings/substitute.h>
#include "codegen/llvm-codegen.h"
#include "exec/hdfs-scan-node.h"
#include "exec/read-write-util.h"
#include "exec/scanner-context.inline.h"
#include "runtime/raw-value.h"
#include "runtime/runtime-state.h"
#include "util/codec.h"
#include "util/decompress.h"
#include "util/runtime-profile-counters.h"
#include "util/test-info.h"
#include "common/names.h"
// Note: the Avro C++ library uses exceptions for error handling. Any Avro
// function that may throw an exception must be placed in a try/catch block.
using namespace impala;
using namespace strings;
const char* HdfsAvroScanner::LLVM_CLASS_NAME = "class.impala::HdfsAvroScanner";
const uint8_t HdfsAvroScanner::AVRO_VERSION_HEADER[4] = {'O', 'b', 'j', 1};
const string HdfsAvroScanner::AVRO_SCHEMA_KEY("avro.schema");
const string HdfsAvroScanner::AVRO_CODEC_KEY("avro.codec");
const string HdfsAvroScanner::AVRO_NULL_CODEC("null");
const string HdfsAvroScanner::AVRO_SNAPPY_CODEC("snappy");
const string HdfsAvroScanner::AVRO_DEFLATE_CODEC("deflate");
const string AVRO_MEM_LIMIT_EXCEEDED = "HdfsAvroScanner::$0() failed to allocate "
"$1 bytes for $2.";
#define RETURN_IF_FALSE(x) if (UNLIKELY(!(x))) return parse_status_
static Status CheckSchema(const AvroSchemaElement& avro_schema) {
if (avro_schema.schema == nullptr) {
return Status("Missing Avro schema in scan node. This could be due to stale "
"metadata. Running 'invalidate metadata <tablename>' may resolve the problem.");
}
return Status::OK();
}
HdfsAvroScanner::HdfsAvroScanner(HdfsScanNodeBase* scan_node, RuntimeState* state)
: BaseSequenceScanner(scan_node, state) {
}
HdfsAvroScanner::HdfsAvroScanner()
: BaseSequenceScanner() {
DCHECK(TestInfo::is_test());
}
Status HdfsAvroScanner::Open(ScannerContext* context) {
RETURN_IF_ERROR(BaseSequenceScanner::Open(context));
RETURN_IF_ERROR(CheckSchema(scan_node_->avro_schema()));
return Status::OK();
}
Status HdfsAvroScanner::Codegen(HdfsScanNodeBase* node,
const vector<ScalarExpr*>& conjuncts, llvm::Function** decode_avro_data_fn) {
*decode_avro_data_fn = nullptr;
DCHECK(node->runtime_state()->ShouldCodegen());
LlvmCodeGen* codegen = node->runtime_state()->codegen();
DCHECK(codegen != nullptr);
RETURN_IF_ERROR(CodegenDecodeAvroData(node, codegen, conjuncts, decode_avro_data_fn));
DCHECK(*decode_avro_data_fn != nullptr);
return Status::OK();
}
BaseSequenceScanner::FileHeader* HdfsAvroScanner::AllocateFileHeader() {
AvroFileHeader* header = new AvroFileHeader();
header->template_tuple = template_tuple_;
return header;
}
Status HdfsAvroScanner::ReadFileHeader() {
DCHECK(only_parsing_header_);
avro_header_ = reinterpret_cast<AvroFileHeader*>(header_);
// Check version header
uint8_t* header;
RETURN_IF_FALSE(stream_->ReadBytes(
sizeof(AVRO_VERSION_HEADER), &header, &parse_status_));
if (memcmp(header, AVRO_VERSION_HEADER, sizeof(AVRO_VERSION_HEADER))) {
return Status(TErrorCode::AVRO_BAD_VERSION_HEADER,
stream_->filename(), ReadWriteUtil::HexDump(header, sizeof(AVRO_VERSION_HEADER)));
}
// Decode relevant metadata (encoded as Avro map)
RETURN_IF_ERROR(ParseMetadata());
// Read file sync marker
uint8_t* sync;
RETURN_IF_FALSE(stream_->ReadBytes(SYNC_HASH_SIZE, &sync, &parse_status_));
memcpy(header_->sync, sync, SYNC_HASH_SIZE);
header_->header_size = stream_->total_bytes_returned() - SYNC_HASH_SIZE;
// Transfer ownership so the memory remains valid for subsequent scanners that process
// the data portions of the file.
scan_node_->TransferToScanNodePool(template_tuple_pool_.get());
return Status::OK();
}
Status HdfsAvroScanner::ParseMetadata() {
header_->is_compressed = false;
header_->compression_type = THdfsCompression::NONE;
int64_t num_entries;
RETURN_IF_FALSE(stream_->ReadZLong(&num_entries, &parse_status_));
if (num_entries < 1) {
return Status(TErrorCode::AVRO_INVALID_METADATA_COUNT, stream_->filename(),
num_entries, stream_->file_offset());
}
while (num_entries != 0) {
DCHECK_GT(num_entries, 0);
for (int i = 0; i < num_entries; ++i) {
// Decode Avro string-type key
string key;
uint8_t* key_buf;
int64_t key_len;
RETURN_IF_FALSE(stream_->ReadZLong(&key_len, &parse_status_));
if (key_len < 0) {
return Status(TErrorCode::AVRO_INVALID_LENGTH, stream_->filename(), key_len,
stream_->file_offset());
}
RETURN_IF_FALSE(stream_->ReadBytes(key_len, &key_buf, &parse_status_));
key = string(reinterpret_cast<char*>(key_buf), key_len);
// Decode Avro bytes-type value
uint8_t* value;
int64_t value_len;
RETURN_IF_FALSE(stream_->ReadZLong(&value_len, &parse_status_));
if (value_len < 0) {
return Status(TErrorCode::AVRO_INVALID_LENGTH, stream_->filename(), value_len,
stream_->file_offset());
}
RETURN_IF_FALSE(stream_->ReadBytes(value_len, &value, &parse_status_));
if (key == AVRO_SCHEMA_KEY) {
avro_schema_t raw_file_schema;
int error = avro_schema_from_json_length(
reinterpret_cast<char*>(value), value_len, &raw_file_schema);
if (error != 0) {
stringstream ss;
ss << "Failed to parse file schema: " << avro_strerror();
return Status(ss.str());
}
AvroSchemaElement* file_schema = avro_header_->schema.get();
RETURN_IF_ERROR(AvroSchemaElement::ConvertSchema(raw_file_schema, file_schema));
RETURN_IF_ERROR(ResolveSchemas(scan_node_->avro_schema(), file_schema));
// We currently codegen a function only for the table schema. If this file's
// schema is different from the table schema, don't use the codegen'd function and
// use the interpreted path instead.
avro_header_->use_codegend_decode_avro_data = avro_schema_equal(
scan_node_->avro_schema().schema, file_schema->schema);
} else if (key == AVRO_CODEC_KEY) {
string avro_codec(reinterpret_cast<char*>(value), value_len);
if (avro_codec != AVRO_NULL_CODEC) {
header_->is_compressed = true;
// This scanner doesn't use header_->codec (Avro doesn't use the
// Hadoop codec strings), but fill it in for logging
header_->codec = avro_codec;
if (avro_codec == AVRO_SNAPPY_CODEC) {
header_->compression_type = THdfsCompression::SNAPPY;
} else if (avro_codec == AVRO_DEFLATE_CODEC) {
header_->compression_type = THdfsCompression::DEFLATE;
} else {
return Status("Unknown Avro compression codec: " + avro_codec);
}
}
} else {
VLOG_ROW << "Skipping metadata entry: " << key;
}
}
RETURN_IF_FALSE(stream_->ReadZLong(&num_entries, &parse_status_));
if (num_entries < 0) {
return Status(TErrorCode::AVRO_INVALID_METADATA_COUNT, stream_->filename(),
num_entries, stream_->file_offset());
}
}
VLOG_FILE << stream_->filename() << ": "
<< (header_->is_compressed ? "compressed" : "not compressed");
if (header_->is_compressed) VLOG_FILE << header_->codec;
if (avro_header_->schema->children.empty()) {
return Status("Schema not found in file header metadata");
}
return Status::OK();
}
// Schema resolution is performed per materialized slot (meaning we don't perform schema
// resolution for non-materialized columns). For each slot, we traverse the table schema
// using the column path (i.e., the traversal is by ordinal). We simultaneously traverse
// the file schema using the table schema's field names. The final field should exist in
// both schemas and be promotable to the slot type. If the file schema is missing a field,
// we check for a default value in the table schema and use that instead.
// TODO: test unresolvable schemas
// TODO: improve error messages
Status HdfsAvroScanner::ResolveSchemas(const AvroSchemaElement& table_root,
AvroSchemaElement* file_root) {
if (table_root.schema->type != AVRO_RECORD) return Status("Table schema is not a record");
if (file_root->schema->type != AVRO_RECORD) return Status("File schema is not a record");
// Associate each slot descriptor with a field in the file schema, or fill in the
// template tuple with a default value from the table schema.
for (SlotDescriptor* slot_desc: scan_node_->materialized_slots()) {
// Traverse the column path, simultaneously traversing the table schema by ordinal and
// the file schema by field name from the table schema.
const SchemaPath& path = slot_desc->col_path();
const AvroSchemaElement* table_record = &table_root;
AvroSchemaElement* file_record = file_root;
for (int i = 0; i < path.size(); ++i) {
int table_field_idx = i > 0 ? path[i] : path[i] - scan_node_->num_partition_keys();
int num_fields = table_record->children.size();
if (table_field_idx >= num_fields) {
// TODO: add path to error message (and elsewhere)
return Status(TErrorCode::AVRO_MISSING_FIELD, table_field_idx, num_fields);
}
const char* field_name =
avro_schema_record_field_name(table_record->schema, table_field_idx);
int file_field_idx =
avro_schema_record_field_get_index(file_record->schema, field_name);
if (file_field_idx < 0) {
// This field doesn't exist in the file schema. Check if there is a default value.
avro_datum_t default_value =
avro_schema_record_field_default(table_record->schema, table_field_idx);
if (default_value == nullptr) {
return Status(TErrorCode::AVRO_MISSING_DEFAULT, field_name);
}
RETURN_IF_ERROR(WriteDefaultValue(slot_desc, default_value, field_name));
DCHECK_EQ(i, path.size() - 1) <<
"WriteDefaultValue() doesn't support default records yet, should have failed";
continue;
}
const AvroSchemaElement& table_field = table_record->children[table_field_idx];
AvroSchemaElement& file_field = file_record->children[file_field_idx];
RETURN_IF_ERROR(VerifyTypesMatch(table_field, file_field, field_name));
if (i != path.size() - 1) {
// All but the last index in 'path' should be a record field
if (table_record->schema->type != AVRO_RECORD) {
return Status(TErrorCode::AVRO_NOT_A_RECORD, field_name);
} else {
DCHECK_EQ(file_record->schema->type, AVRO_RECORD);
}
table_record = &table_field;
file_record = &file_field;
} else {
// This should be the field corresponding to 'slot_desc'. Check that slot_desc can
// be resolved to the table's Avro schema.
RETURN_IF_ERROR(VerifyTypesMatch(slot_desc, table_field.schema));
file_field.slot_desc = slot_desc;
}
}
}
return Status::OK();
}
Status HdfsAvroScanner::WriteDefaultValue(
SlotDescriptor* slot_desc, avro_datum_t default_value, const char* field_name) {
if (avro_header_->template_tuple == nullptr) {
if (template_tuple_ != nullptr) {
avro_header_->template_tuple = template_tuple_;
} else {
avro_header_->template_tuple =
Tuple::Create(tuple_byte_size_, template_tuple_pool_.get());
}
}
switch (default_value->type) {
case AVRO_BOOLEAN: {
// We don't call VerifyTypesMatch() above the switch statement so we don't want to
// call it in the default case (since we VerifyTypesMatch() can't handle every type
// either, and we want to return the correct error message).
RETURN_IF_ERROR(VerifyTypesMatch(slot_desc, default_value));
int8_t v;
if (avro_boolean_get(default_value, &v)) DCHECK(false);
RawValue::Write(&v, avro_header_->template_tuple, slot_desc, nullptr);
break;
}
case AVRO_INT32:
case AVRO_DATE: {
RETURN_IF_ERROR(VerifyTypesMatch(slot_desc, default_value));
int32_t v;
if (avro_int32_get(default_value, &v)) DCHECK(false);
RawValue::Write(&v, avro_header_->template_tuple, slot_desc, nullptr);
break;
}
case AVRO_INT64: {
RETURN_IF_ERROR(VerifyTypesMatch(slot_desc, default_value));
int64_t v;
if (avro_int64_get(default_value, &v)) DCHECK(false);
RawValue::Write(&v, avro_header_->template_tuple, slot_desc, nullptr);
break;
}
case AVRO_FLOAT: {
RETURN_IF_ERROR(VerifyTypesMatch(slot_desc, default_value));
float v;
if (avro_float_get(default_value, &v)) DCHECK(false);
RawValue::Write(&v, avro_header_->template_tuple, slot_desc, nullptr);
break;
}
case AVRO_DOUBLE: {
RETURN_IF_ERROR(VerifyTypesMatch(slot_desc, default_value));
double v;
if (avro_double_get(default_value, &v)) DCHECK(false);
RawValue::Write(&v, avro_header_->template_tuple, slot_desc, nullptr);
break;
}
case AVRO_STRING:
case AVRO_BYTES: {
RETURN_IF_ERROR(VerifyTypesMatch(slot_desc, default_value));
char* v;
if (avro_string_get(default_value, &v)) DCHECK(false);
StringValue sv(v);
RawValue::Write(&sv, avro_header_->template_tuple, slot_desc,
template_tuple_pool_.get());
break;
}
case AVRO_NULL:
RETURN_IF_ERROR(VerifyTypesMatch(slot_desc, default_value));
avro_header_->template_tuple->SetNull(slot_desc->null_indicator_offset());
break;
default:
return Status(TErrorCode::AVRO_UNSUPPORTED_DEFAULT_VALUE, field_name,
avro_type_name(default_value->type));
}
return Status::OK();
}
Status HdfsAvroScanner::VerifyTypesMatch(const AvroSchemaElement& table_schema,
const AvroSchemaElement& file_schema, const string& field_name) {
if (!table_schema.nullable() && file_schema.nullable()) {
// Use ErrorMsg because corresponding Status ctor is ambiguous
ErrorMsg msg(TErrorCode::AVRO_NULLABILITY_MISMATCH, field_name);
return Status(msg);
}
if (file_schema.schema->type == AVRO_NULL) {
if (table_schema.schema->type == AVRO_NULL || table_schema.nullable()) {
return Status::OK();
} else {
return Status(TErrorCode::AVRO_SCHEMA_RESOLUTION_ERROR, field_name,
avro_type_name(table_schema.schema->type),
avro_type_name(file_schema.schema->type));
}
}
// Can't convert records to ColumnTypes, check here instead of below
// TODO: update if/when we have TYPE_STRUCT primitive type
if ((table_schema.schema->type == AVRO_RECORD) ^
(file_schema.schema->type == AVRO_RECORD)) {
return Status(TErrorCode::AVRO_SCHEMA_RESOLUTION_ERROR, field_name,
avro_type_name(table_schema.schema->type),
avro_type_name(file_schema.schema->type));
} else if (table_schema.schema->type == AVRO_RECORD) {
DCHECK_EQ(file_schema.schema->type, AVRO_RECORD);
return Status::OK();
}
ColumnType reader_type;
RETURN_IF_ERROR(AvroSchemaToColumnType(table_schema.schema, field_name, &reader_type));
ColumnType writer_type;
RETURN_IF_ERROR(AvroSchemaToColumnType(file_schema.schema, field_name, &writer_type));
bool match = VerifyTypesMatch(reader_type, writer_type);
if (match) return Status::OK();
return Status(TErrorCode::AVRO_SCHEMA_RESOLUTION_ERROR, field_name,
avro_type_name(table_schema.schema->type),
avro_type_name(file_schema.schema->type));
}
Status HdfsAvroScanner::VerifyTypesMatch(SlotDescriptor* slot_desc, avro_obj_t* schema) {
// TODO: make this work for nested fields
const string& col_name =
scan_node_->hdfs_table()->col_descs()[slot_desc->col_pos()].name();
// All Impala types are nullable
if (schema->type == AVRO_NULL) return Status::OK();
// Can't convert records to ColumnTypes, check here instead of below
// TODO: update if/when we have TYPE_STRUCT primitive type
if (schema->type == AVRO_RECORD) {
return Status(TErrorCode::AVRO_SCHEMA_METADATA_MISMATCH, col_name,
slot_desc->type().DebugString(), avro_type_name(schema->type));
}
ColumnType file_type;
RETURN_IF_ERROR(AvroSchemaToColumnType(schema, col_name, &file_type));
bool match = VerifyTypesMatch(slot_desc->type(), file_type);
if (match) return Status::OK();
return Status(TErrorCode::AVRO_SCHEMA_METADATA_MISMATCH, col_name,
slot_desc->type().DebugString(), avro_type_name(schema->type));
}
bool HdfsAvroScanner::VerifyTypesMatch(
const ColumnType& reader_type, const ColumnType& writer_type) {
switch (writer_type.type) {
case TYPE_DECIMAL:
if (reader_type.type != TYPE_DECIMAL) return false;
if (reader_type.scale != writer_type.scale) return false;
if (reader_type.precision != writer_type.precision) return false;
return true;
case TYPE_STRING: return reader_type.IsStringType();
case TYPE_INT:
case TYPE_DATE:
switch(reader_type.type) {
case TYPE_INT:
case TYPE_DATE:
// Type promotion
case TYPE_BIGINT:
case TYPE_FLOAT:
case TYPE_DOUBLE:
return true;
default:
return false;
}
case TYPE_BIGINT:
switch(reader_type.type) {
case TYPE_BIGINT:
// Type promotion
case TYPE_FLOAT:
case TYPE_DOUBLE:
return true;
default:
return false;
}
case TYPE_FLOAT:
switch(reader_type.type) {
case TYPE_FLOAT:
// Type promotion
case TYPE_DOUBLE:
return true;
default:
return false;
}
case TYPE_DOUBLE: return reader_type.type == TYPE_DOUBLE;
case TYPE_BOOLEAN: return reader_type.type == TYPE_BOOLEAN;
default:
DCHECK(false) << "NYI: " << writer_type.DebugString();
return false;
}
}
Status HdfsAvroScanner::InitNewRange() {
DCHECK(header_ != nullptr);
only_parsing_header_ = false;
avro_header_ = reinterpret_cast<AvroFileHeader*>(header_);
template_tuple_ = avro_header_->template_tuple;
if (header_->is_compressed) {
RETURN_IF_ERROR(UpdateDecompressor(header_->compression_type));
}
if (avro_header_->use_codegend_decode_avro_data) {
codegend_decode_avro_data_ = reinterpret_cast<DecodeAvroDataFn>(
scan_node_->GetCodegenFn(THdfsFileFormat::AVRO));
}
if (codegend_decode_avro_data_ == nullptr) {
scan_node_->IncNumScannersCodegenDisabled();
} else {
VLOG(2) << "HdfsAvroScanner (node_id=" << scan_node_->id()
<< ") using llvm codegend functions.";
scan_node_->IncNumScannersCodegenEnabled();
}
return Status::OK();
}
Status HdfsAvroScanner::ProcessRange(RowBatch* row_batch) {
// Process blocks until we hit eos, the limit or the batch fills up. Check
// AtCapacity() at the end of the loop to guarantee that we process at least one row
// so that we make progress even if the batch starts off with AtCapacity() == true,
// which can happen if the tuple buffer is > 8MB.
DCHECK_GT(row_batch->capacity(), row_batch->num_rows());
while (!eos_ && !scan_node_->ReachedLimitShared()) {
if (record_pos_ == num_records_in_block_) {
// Read new data block
RETURN_IF_FALSE(stream_->ReadZLong(&num_records_in_block_, &parse_status_));
if (num_records_in_block_ < 0) {
return Status(TErrorCode::AVRO_INVALID_RECORD_COUNT, stream_->filename(),
num_records_in_block_, stream_->file_offset());
}
int64_t compressed_size;
RETURN_IF_FALSE(stream_->ReadZLong(&compressed_size, &parse_status_));
if (compressed_size < 0) {
return Status(TErrorCode::AVRO_INVALID_COMPRESSED_SIZE, stream_->filename(),
compressed_size, stream_->file_offset());
}
uint8_t* compressed_data;
RETURN_IF_FALSE(stream_->ReadBytes(
compressed_size, &compressed_data, &parse_status_));
if (header_->is_compressed) {
if (header_->compression_type == THdfsCompression::SNAPPY) {
// Snappy-compressed data block includes trailing 4-byte checksum,
// decompressor_ doesn't expect this
compressed_size -= SnappyDecompressor::TRAILING_CHECKSUM_LEN;
}
SCOPED_TIMER(decompress_timer_);
RETURN_IF_ERROR(decompressor_->ProcessBlock(false, compressed_size,
compressed_data, &data_block_len_, &data_block_));
} else {
data_block_ = compressed_data;
data_block_len_ = compressed_size;
}
data_block_end_ = data_block_ + data_block_len_;
record_pos_ = 0;
}
int64_t prev_record_pos = record_pos_;
int block_start_row = row_batch->num_rows();
// Process the remaining data in the current block. Always process at least one row
// to ensure we make progress even if the batch starts off with AtCapacity() == true.
DCHECK_GT(row_batch->capacity(), row_batch->num_rows());
while (record_pos_ != num_records_in_block_) {
SCOPED_TIMER(scan_node_->materialize_tuple_timer());
Tuple* tuple = tuple_;
TupleRow* tuple_row = row_batch->GetRow(row_batch->AddRow());
int max_tuples = row_batch->capacity() - row_batch->num_rows();
max_tuples = min<int64_t>(max_tuples, num_records_in_block_ - record_pos_);
int num_to_commit;
if (scan_node_->materialized_slots().empty()) {
// No slots to materialize (e.g. count(*)), no need to decode data
num_to_commit = WriteTemplateTuples(tuple_row, max_tuples);
} else if (codegend_decode_avro_data_ != nullptr) {
num_to_commit = codegend_decode_avro_data_(this, max_tuples,
row_batch->tuple_data_pool(), &data_block_, data_block_end_, tuple, tuple_row);
} else {
num_to_commit = DecodeAvroData(max_tuples, row_batch->tuple_data_pool(),
&data_block_, data_block_end_, tuple, tuple_row);
}
RETURN_IF_ERROR(parse_status_);
RETURN_IF_ERROR(CommitRows(num_to_commit, row_batch));
record_pos_ += max_tuples;
COUNTER_ADD(scan_node_->rows_read_counter(), max_tuples);
if (row_batch->AtCapacity() || scan_node_->ReachedLimitShared()) break;
}
if (record_pos_ == num_records_in_block_) {
if (decompressor_.get() != nullptr && !decompressor_->reuse_output_buffer()) {
if (prev_record_pos == 0 && row_batch->num_rows() == block_start_row) {
// Did not return any rows from current block in this or a previous
// ProcessRange() call - we can recycle the memory. This optimisation depends on
// passing keep_current_chunk = false to the AcquireData() call below, so that
// the current chunk only contains data for the current Avro block.
data_buffer_pool_->Clear();
} else {
// Returned rows may reference data buffers - need to attach to batch.
row_batch->tuple_data_pool()->AcquireData(data_buffer_pool_.get(), false);
}
}
RETURN_IF_ERROR(ReadSync());
}
if (row_batch->AtCapacity()) break;
}
return Status::OK();
}
bool HdfsAvroScanner::MaterializeTuple(const AvroSchemaElement& record_schema,
MemPool* pool, uint8_t** data, uint8_t* data_end, Tuple* tuple) {
DCHECK_EQ(record_schema.schema->type, AVRO_RECORD);
for (const AvroSchemaElement& element: record_schema.children) {
DCHECK_LE(*data, data_end);
const SlotDescriptor* slot_desc = element.slot_desc;
bool write_slot = false;
void* slot = nullptr;
PrimitiveType slot_type = INVALID_TYPE;
if (slot_desc != nullptr) {
write_slot = true;
slot = tuple->GetSlot(slot_desc->tuple_offset());
slot_type = slot_desc->type().type;
}
avro_type_t type = element.schema->type;
if (element.nullable()) {
bool is_null;
if (!ReadUnionType(element.null_union_position, data, data_end, &is_null)) {
return false;
}
if (is_null) type = AVRO_NULL;
}
bool success;
switch (type) {
case AVRO_NULL:
if (slot_desc != nullptr) tuple->SetNull(slot_desc->null_indicator_offset());
success = true;
break;
case AVRO_BOOLEAN:
success = ReadAvroBoolean(slot_type, data, data_end, write_slot, slot, pool);
break;
case AVRO_DATE:
case AVRO_INT32:
if (slot_type == TYPE_DATE) {
success = ReadAvroDate(slot_type, data, data_end, write_slot, slot, pool);
} else {
success = ReadAvroInt32(slot_type, data, data_end, write_slot, slot, pool);
}
break;
case AVRO_INT64:
success = ReadAvroInt64(slot_type, data, data_end, write_slot, slot, pool);
break;
case AVRO_FLOAT:
success = ReadAvroFloat(slot_type, data, data_end, write_slot, slot, pool);
break;
case AVRO_DOUBLE:
success = ReadAvroDouble(slot_type, data, data_end, write_slot, slot, pool);
break;
case AVRO_STRING:
case AVRO_BYTES:
if (slot_desc != nullptr && slot_desc->type().type == TYPE_VARCHAR) {
success = ReadAvroVarchar(slot_type, slot_desc->type().len, data, data_end,
write_slot, slot, pool);
} else if (slot_desc != nullptr && slot_desc->type().type == TYPE_CHAR) {
success = ReadAvroChar(slot_type, slot_desc->type().len, data, data_end,
write_slot, slot, pool);
} else {
success = ReadAvroString(slot_type, data, data_end, write_slot, slot, pool);
}
break;
case AVRO_DECIMAL: {
int slot_byte_size = 0;
if (slot_desc != nullptr) {
DCHECK_EQ(slot_type, TYPE_DECIMAL);
slot_byte_size = slot_desc->type().GetByteSize();
}
success = ReadAvroDecimal(slot_byte_size, data, data_end, write_slot, slot, pool);
break;
}
case AVRO_RECORD:
success = MaterializeTuple(element, pool, data, data_end, tuple);
break;
default:
success = false;
DCHECK(false) << "Unsupported SchemaElement: " << type;
}
if (UNLIKELY(!success)) {
DCHECK(!parse_status_.ok());
return false;
}
}
return true;
}
void HdfsAvroScanner::SetStatusCorruptData(TErrorCode::type error_code) {
DCHECK(parse_status_.ok());
if (TestInfo::is_test()) {
parse_status_ = Status(error_code, "test file", 123);
} else {
parse_status_ = Status(error_code, stream_->filename(), stream_->file_offset());
}
}
void HdfsAvroScanner::SetStatusInvalidValue(TErrorCode::type error_code, int64_t len) {
DCHECK(parse_status_.ok());
if (TestInfo::is_test()) {
parse_status_ = Status(error_code, "test file", len, 123);
} else {
parse_status_ = Status(error_code, stream_->filename(), len, stream_->file_offset());
}
}
void HdfsAvroScanner::SetStatusValueOverflow(TErrorCode::type error_code, int64_t len,
int64_t limit) {
DCHECK(parse_status_.ok());
if (TestInfo::is_test()) {
parse_status_ = Status(error_code, "test file", len, limit, 123);
} else {
parse_status_ = Status(error_code, stream_->filename(), len, limit,
stream_->file_offset());
}
}
// This function produces a codegen'd function equivalent to MaterializeTuple() but
// optimized for the table schema. Via helper functions CodegenReadRecord() and
// CodegenReadScalar(), it eliminates the conditionals necessary when interpreting the
// type of each element in the schema, instead generating code to handle each element in
// the schema.
//
// To avoid overly long codegen times for wide schemas, this function generates
// one function per 200 columns, and a function that calls them all together.
//
//
// Example output with 'select count(*) from tpch_avro.region':
//
// define i1 @MaterializeTuple-helper0(%"class.impala::HdfsAvroScanner"* %this, %"struct.impala::AvroSchemaElement"* %record_schema, %"class.impala::MemPool"* %pool, i8** %data, i8* %data_end, %"class.impala::Tuple"* %tuple) #34 {
// entry:
// %is_null_ptr = alloca i1
// %tuple_ptr = bitcast %"class.impala::Tuple"* %tuple to <{}>*
// %0 = bitcast i1* %is_null_ptr to i8*
// %read_union_ok = call i1 @_ZN6impala15HdfsAvroScanner13ReadUnionTypeEiPPhS1_Pb(%"class.impala::HdfsAvroScanner"* %this, i32 1, i8** %data, i8* %data_end, i8* %0)
// br i1 %read_union_ok, label %read_union_ok1, label %bail_out
//
// read_union_ok1: ; preds = %entry
// %is_null = load i1, i1* %is_null_ptr
// br i1 %is_null, label %null_field, label %read_field
//
// read_field: ; preds = %read_union_ok1
// %success = call i1 @_ZN6impala15HdfsAvroScanner13ReadAvroInt32ENS_13PrimitiveTypeEPPhS2_bPvPNS_7MemPoolE(%"class.impala::HdfsAvroScanner"* %this, i32 0, i8** %data, i8* %data_end, i1 false, i8* null, %"class.impala::MemPool"* %pool)
// br i1 %success, label %end_field, label %bail_out
//
// null_field: ; preds = %read_union_ok1
// br label %end_field
//
// end_field: ; preds = %read_field, %null_field
// %1 = bitcast i1* %is_null_ptr to i8*
// %read_union_ok4 = call i1 @_ZN6impala15HdfsAvroScanner13ReadUnionTypeEiPPhS1_Pb(%"class.impala::HdfsAvroScanner"* %this, i32 1, i8** %data, i8* %data_end, i8* %1)
// br i1 %read_union_ok4, label %read_union_ok5, label %bail_out
//
// read_union_ok5: ; preds = %end_field
// %is_null7 = load i1, i1* %is_null_ptr
// br i1 %is_null7, label %null_field6, label %read_field2
//
// read_field2: ; preds = %read_union_ok5
// %success8 = call i1 @_ZN6impala15HdfsAvroScanner14ReadAvroStringENS_13PrimitiveTypeEPPhS2_bPvPNS_7MemPoolE(%"class.impala::HdfsAvroScanner"* %this, i32 0, i8** %data, i8* %data_end, i1 false, i8* null, %"class.impala::MemPool"* %pool)
// br i1 %success8, label %end_field3, label %bail_out
//
// null_field6: ; preds = %read_union_ok5
// br label %end_field3
//
// end_field3: ; preds = %read_field2, %null_field6
// %2 = bitcast i1* %is_null_ptr to i8*
// %read_union_ok11 = call i1 @_ZN6impala15HdfsAvroScanner13ReadUnionTypeEiPPhS1_Pb(%"class.impala::HdfsAvroScanner"* %this, i32 1, i8** %data, i8* %data_end, i8* %2)
// br i1 %read_union_ok11, label %read_union_ok12, label %bail_out
//
// read_union_ok12: ; preds = %end_field3
// %is_null14 = load i1, i1* %is_null_ptr
// br i1 %is_null14, label %null_field13, label %read_field9
//
// read_field9: ; preds = %read_union_ok12
// %success15 = call i1 @_ZN6impala15HdfsAvroScanner14ReadAvroStringENS_13PrimitiveTypeEPPhS2_bPvPNS_7MemPoolE(%"class.impala::HdfsAvroScanner"* %this, i32 0, i8** %data, i8* %data_end, i1 false, i8* null, %"class.impala::MemPool"* %pool)
// br i1 %success15, label %end_field10, label %bail_out
//
// null_field13: ; preds = %read_union_ok12
// br label %end_field10
//
// end_field10: ; preds = %read_field9, %null_field13
// ret i1 true
//
// bail_out: ; preds = %read_field9, %end_field3, %read_field2, %end_field, %read_field, %entry
// ret i1 false
// }
//
// define i1 @MaterializeTuple(%"class.impala::HdfsAvroScanner"* %this, %"struct.impala::AvroSchemaElement"* %record_schema, %"class.impala::MemPool"* %pool, i8** %data, i8* %data_end, %"class.impala::Tuple"* %tuple) #34 {
// entry:
// %helper_01 = call i1 @MaterializeTuple-helper0(%"class.impala::HdfsAvroScanner"* %this, %"struct.impala::AvroSchemaElement"* %record_schema, %"class.impala::MemPool"* %pool, i8** %data, i8* %data_end, %"class.impala::Tuple"* %tuple)
// br i1 %helper_01, label %helper_0, label %bail_out
//
// helper_0: ; preds = %entry
// ret i1 true
//
// bail_out: ; preds = %entry
// ret i1 false
// }
Status HdfsAvroScanner::CodegenMaterializeTuple(const HdfsScanNodeBase* node,
LlvmCodeGen* codegen, llvm::Function** materialize_tuple_fn) {
llvm::LLVMContext& context = codegen->context();
LlvmBuilder builder(context);
llvm::PointerType* this_ptr_type = codegen->GetStructPtrType<HdfsAvroScanner>();
TupleDescriptor* tuple_desc = const_cast<TupleDescriptor*>(node->tuple_desc());
llvm::StructType* tuple_type = tuple_desc->GetLlvmStruct(codegen);
if (tuple_type == nullptr) return Status("Could not generate tuple struct.");
llvm::Type* tuple_ptr_type = llvm::PointerType::get(tuple_type, 0);
llvm::PointerType* tuple_opaque_ptr_type = codegen->GetStructPtrType<Tuple>();
llvm::Type* data_ptr_type = codegen->ptr_ptr_type(); // char**
llvm::Type* mempool_type = codegen->GetStructPtrType<MemPool>();
llvm::Type* schema_element_type = codegen->GetStructPtrType<AvroSchemaElement>();
// Schema can be null if metadata is stale. See test in
// queries/QueryTest/avro-schema-changes.test.
RETURN_IF_ERROR(CheckSchema(node->avro_schema()));
int num_children = node->avro_schema().children.size();
if (num_children == 0) {
return Status("Invalid Avro record schema: contains no children.");
}
// We handle step_size columns per function. This minimizes LLVM
// optimization time and was determined empirically. If there are
// too many functions, it takes LLVM longer to optimize. If the functions
// are too long, it takes LLVM longer too.
int step_size = 200;
std::vector<llvm::Function*> helper_functions;
// prototype re-used several times by amending with SetName()
LlvmCodeGen::FnPrototype prototype(codegen, "", codegen->bool_type());
prototype.AddArgument(LlvmCodeGen::NamedVariable("this", this_ptr_type));
prototype.AddArgument(LlvmCodeGen::NamedVariable("record_schema", schema_element_type));
prototype.AddArgument(LlvmCodeGen::NamedVariable("pool", mempool_type));
prototype.AddArgument(LlvmCodeGen::NamedVariable("data", data_ptr_type));
prototype.AddArgument(LlvmCodeGen::NamedVariable("data_end", codegen->ptr_type()));
prototype.AddArgument(LlvmCodeGen::NamedVariable("tuple", tuple_opaque_ptr_type));
// Generate helper functions for every step_size columns.
for (int i = 0; i < num_children; i += step_size) {
prototype.SetName("MaterializeTuple-helper" + std::to_string(i));
llvm::Value* args[6];
llvm::Function* helper_fn = prototype.GeneratePrototype(&builder, args);
llvm::Value* this_val = args[0];
// llvm::Value* record_schema_val = args[1]; // don't need this
llvm::Value* pool_val = args[2];
llvm::Value* data_val = args[3];
llvm::Value* data_end_val = args[4];
llvm::Value* opaque_tuple_val = args[5];
llvm::Value* tuple_val =
builder.CreateBitCast(opaque_tuple_val, tuple_ptr_type, "tuple_ptr");
// Create a bail out block to handle decoding failures.
llvm::BasicBlock* bail_out_block =
llvm::BasicBlock::Create(context, "bail_out", helper_fn, nullptr);
Status status = CodegenReadRecord(
SchemaPath(), node->avro_schema(), i, std::min(num_children, i + step_size),
node, codegen, &builder, helper_fn, bail_out_block,
bail_out_block, this_val, pool_val, tuple_val, data_val, data_end_val);
if (!status.ok()) {
VLOG_QUERY << status.GetDetail();
return status;
}
// Returns true on successful decoding.
builder.CreateRet(codegen->true_value());
// Returns false on decoding errors.
builder.SetInsertPoint(bail_out_block);
builder.CreateRet(codegen->false_value());
if (codegen->FinalizeFunction(helper_fn) == nullptr) {
return Status("Failed to finalize helper_fn.");
}
helper_functions.push_back(helper_fn);
}
// Actual MaterializeTuple. Call all the helper functions.
{
llvm::Value* args[6];
prototype.SetName("MaterializeTuple");
llvm::Function* fn = prototype.GeneratePrototype(&builder, args);
// These are the blocks that we go to after the helper runs.
std::vector<llvm::BasicBlock*> helper_blocks;
for (int i = 0; i < helper_functions.size(); ++i) {
llvm::BasicBlock* helper_block =
llvm::BasicBlock::Create(context, "helper_" + std::to_string(i), fn, nullptr);
helper_blocks.push_back(helper_block);
}
// Block for failures
llvm::BasicBlock* bail_out_block =
llvm::BasicBlock::Create(context, "bail_out", fn, nullptr);
// Call the helpers.
for (int i = 0; i < helper_functions.size(); ++i) {
if (i != 0) builder.SetInsertPoint(helper_blocks[i - 1]);
llvm::Function* fnHelper = helper_functions[i];
llvm::Value* helper_ok =
builder.CreateCall(fnHelper, args, "helper_" + std::to_string(i));
builder.CreateCondBr(helper_ok, helper_blocks[i], bail_out_block);
}
// Return false on errors
builder.SetInsertPoint(bail_out_block);
builder.CreateRet(codegen->false_value());
// And true on success
builder.SetInsertPoint(helper_blocks[helper_blocks.size() - 1]);
builder.CreateRet(codegen->true_value());
*materialize_tuple_fn = codegen->FinalizeFunction(fn);
if (*materialize_tuple_fn == nullptr) {
return Status("Failed to finalize materialize_tuple_fn.");
}
}
return Status::OK();
}
Status HdfsAvroScanner::CodegenReadRecord(const SchemaPath& path,
const AvroSchemaElement& record, int child_start, int child_end,
const HdfsScanNodeBase* node, LlvmCodeGen* codegen, void* void_builder,
llvm::Function* fn, llvm::BasicBlock* insert_before, llvm::BasicBlock* bail_out,
llvm::Value* this_val, llvm::Value* pool_val, llvm::Value* tuple_val,
llvm::Value* data_val, llvm::Value* data_end_val) {
RETURN_IF_ERROR(CheckSchema(record));
DCHECK_EQ(record.schema->type, AVRO_RECORD);
llvm::LLVMContext& context = codegen->context();
LlvmBuilder* builder = reinterpret_cast<LlvmBuilder*>(void_builder);
// Codegen logic for parsing each field and, if necessary, populating a slot with the
// result.
// Used to store result of ReadUnionType() call
llvm::Value* is_null_ptr = nullptr;
for (int i = child_start; i < child_end; ++i) {
const AvroSchemaElement* field = &record.children[i];
int col_idx = i;
// If we're about to process the table-level columns, account for the partition keys
// when constructing 'path'
if (path.empty()) col_idx += node->num_partition_keys();
SchemaPath new_path = path;
new_path.push_back(col_idx);
int slot_idx = node->GetMaterializedSlotIdx(new_path);
SlotDescriptor* slot_desc = (slot_idx == HdfsScanNodeBase::SKIP_COLUMN) ?
nullptr : node->materialized_slots()[slot_idx];
// Block that calls appropriate Read<Type> function
llvm::BasicBlock* read_field_block =
llvm::BasicBlock::Create(context, "read_field", fn, insert_before);
// Block that handles a nullptr value. We fill this in below if the field is nullable,
// otherwise we leave this block nullptr.
llvm::BasicBlock* null_block = nullptr;
// This is where we should end up after we're finished processing this field. Used to
// put the builder in the right place for the next field.
llvm::BasicBlock* end_field_block =
llvm::BasicBlock::Create(context, "end_field", fn, insert_before);
if (field->nullable()) {
// Field could be null. Create conditional branch based on ReadUnionType result.
llvm::Function* read_union_fn =
codegen->GetFunction(IRFunction::READ_UNION_TYPE, false);
llvm::Value* null_union_pos_val =
codegen->GetI32Constant(field->null_union_position);
if (is_null_ptr == nullptr) {
is_null_ptr = codegen->CreateEntryBlockAlloca(*builder, codegen->bool_type(),
"is_null_ptr");
}
llvm::Value* is_null_ptr_cast =
builder->CreateBitCast(is_null_ptr, codegen->ptr_type());
llvm::Value* read_union_ok = builder->CreateCall(read_union_fn,
llvm::ArrayRef<llvm::Value*>(
{this_val, null_union_pos_val, data_val, data_end_val, is_null_ptr_cast}),
"read_union_ok");
llvm::BasicBlock* read_union_ok_block =
llvm::BasicBlock::Create(context, "read_union_ok", fn, read_field_block);
builder->CreateCondBr(read_union_ok, read_union_ok_block, bail_out);
builder->SetInsertPoint(read_union_ok_block);
null_block = llvm::BasicBlock::Create(context, "null_field", fn, end_field_block);
llvm::Value* is_null = builder->CreateLoad(is_null_ptr, "is_null");
builder->CreateCondBr(is_null, null_block, read_field_block);
// Write null field IR
builder->SetInsertPoint(null_block);
if (slot_idx != HdfsScanNodeBase::SKIP_COLUMN) {
slot_desc->CodegenSetNullIndicator(
codegen, builder, tuple_val, codegen->true_value());
}
// LLVM requires all basic blocks to end with a terminating instruction
builder->CreateBr(end_field_block);
} else {
// Field is never null, read field unconditionally.
builder->CreateBr(read_field_block);
}
// Write read_field_block IR
builder->SetInsertPoint(read_field_block);
llvm::Value* ret_val = nullptr;
if (field->schema->type == AVRO_RECORD) {
llvm::BasicBlock* insert_before_block =
(null_block != nullptr) ? null_block : end_field_block;
RETURN_IF_ERROR(CodegenReadRecord(new_path, *field, 0, field->children.size(),
node, codegen, builder, fn,
insert_before_block, bail_out, this_val, pool_val, tuple_val, data_val,
data_end_val));
} else {
RETURN_IF_ERROR(CodegenReadScalar(*field, slot_desc, codegen, builder,
this_val, pool_val, tuple_val, data_val, data_end_val, &ret_val));
}
builder->CreateCondBr(ret_val, end_field_block, bail_out);
// Set insertion point for next field.
builder->SetInsertPoint(end_field_block);
}
return Status::OK();
}
Status HdfsAvroScanner::CodegenReadScalar(const AvroSchemaElement& element,
SlotDescriptor* slot_desc, LlvmCodeGen* codegen, void* void_builder,
llvm::Value* this_val, llvm::Value* pool_val, llvm::Value* tuple_val,
llvm::Value* data_val, llvm::Value* data_end_val, llvm::Value** ret_val) {
LlvmBuilder* builder = reinterpret_cast<LlvmBuilder*>(void_builder);
llvm::Function* read_field_fn;
switch (element.schema->type) {
case AVRO_BOOLEAN:
read_field_fn = codegen->GetFunction(IRFunction::READ_AVRO_BOOLEAN, false);
break;
case AVRO_DATE:
if (slot_desc != nullptr && slot_desc->type().type == TYPE_INT) {
read_field_fn = codegen->GetFunction(IRFunction::READ_AVRO_INT32, false);
} else {
read_field_fn = codegen->GetFunction(IRFunction::READ_AVRO_DATE, false);
}
break;
case AVRO_INT32:
if (slot_desc != nullptr && slot_desc->type().type == TYPE_DATE) {
read_field_fn = codegen->GetFunction(IRFunction::READ_AVRO_DATE, false);
} else {
read_field_fn = codegen->GetFunction(IRFunction::READ_AVRO_INT32, false);
}
break;
case AVRO_INT64:
read_field_fn = codegen->GetFunction(IRFunction::READ_AVRO_INT64, false);
break;
case AVRO_FLOAT:
read_field_fn = codegen->GetFunction(IRFunction::READ_AVRO_FLOAT, false);
break;
case AVRO_DOUBLE:
read_field_fn = codegen->GetFunction(IRFunction::READ_AVRO_DOUBLE, false);
break;
case AVRO_STRING:
case AVRO_BYTES:
if (slot_desc != nullptr && slot_desc->type().type == TYPE_VARCHAR) {
read_field_fn = codegen->GetFunction(IRFunction::READ_AVRO_VARCHAR, false);
} else if (slot_desc != nullptr && slot_desc->type().type == TYPE_CHAR) {
read_field_fn = codegen->GetFunction(IRFunction::READ_AVRO_CHAR, false);
} else {
read_field_fn = codegen->GetFunction(IRFunction::READ_AVRO_STRING, false);
}
break;
case AVRO_DECIMAL:
read_field_fn = codegen->GetFunction(IRFunction::READ_AVRO_DECIMAL, false);
break;
default:
return Status::Expected(Substitute(
"Failed to codegen MaterializeTuple() due to unsupported type: $0",
element.schema->type));
}
// Call appropriate ReadAvro<Type> function
llvm::Value* write_slot_val = builder->getFalse();
llvm::Value* slot_type_val = builder->getInt32(0);
llvm::Value* opaque_slot_val = codegen->null_ptr_value();
if (slot_desc != nullptr) {
// Field corresponds to a materialized column, fill in relevant arguments
write_slot_val = builder->getTrue();
if (slot_desc->type().type == TYPE_DECIMAL) {
// ReadAvroDecimal() takes slot byte size instead of slot type
slot_type_val = builder->getInt32(slot_desc->type().GetByteSize());
} else {
slot_type_val = builder->getInt32(slot_desc->type().type);
}
llvm::Value* slot_val =
builder->CreateStructGEP(nullptr, tuple_val, slot_desc->llvm_field_idx(), "slot");
opaque_slot_val =
builder->CreateBitCast(slot_val, codegen->ptr_type(), "opaque_slot");
}
// NOTE: ReadAvroVarchar/Char has different signature than rest of read functions
if (slot_desc != nullptr &&
(slot_desc->type().type == TYPE_VARCHAR || slot_desc->type().type == TYPE_CHAR)) {
// Need to pass an extra argument (the length) to the codegen function.
llvm::Value* fixed_len = builder->getInt32(slot_desc->type().len);
llvm::Value* read_field_args[] = {this_val, slot_type_val, fixed_len, data_val,
data_end_val, write_slot_val, opaque_slot_val, pool_val};
*ret_val = builder->CreateCall(read_field_fn, read_field_args, "success");
} else {
llvm::Value* read_field_args[] = {this_val, slot_type_val, data_val, data_end_val,
write_slot_val, opaque_slot_val, pool_val};
*ret_val = builder->CreateCall(read_field_fn, read_field_args, "success");
}
return Status::OK();
}
Status HdfsAvroScanner::CodegenDecodeAvroData(const HdfsScanNodeBase* node,
LlvmCodeGen* codegen, const vector<ScalarExpr*>& conjuncts,
llvm::Function** decode_avro_data_fn) {
llvm::Function* materialize_tuple_fn;
RETURN_IF_ERROR(CodegenMaterializeTuple(node, codegen, &materialize_tuple_fn));
DCHECK(materialize_tuple_fn != nullptr);
llvm::Function* fn = codegen->GetFunction(IRFunction::DECODE_AVRO_DATA, true);
llvm::Function* init_tuple_fn;
RETURN_IF_ERROR(CodegenInitTuple(node, codegen, &init_tuple_fn));
int replaced = codegen->ReplaceCallSites(fn, init_tuple_fn, "InitTuple");
DCHECK_REPLACE_COUNT(replaced, 1);
replaced = codegen->ReplaceCallSites(fn, materialize_tuple_fn, "MaterializeTuple");
DCHECK_REPLACE_COUNT(replaced, 1);
llvm::Function* eval_conjuncts_fn;
RETURN_IF_ERROR(ExecNode::CodegenEvalConjuncts(codegen, conjuncts, &eval_conjuncts_fn));
replaced = codegen->ReplaceCallSites(fn, eval_conjuncts_fn, "EvalConjuncts");
DCHECK_REPLACE_COUNT(replaced, 1);
llvm::Function* copy_strings_fn;
RETURN_IF_ERROR(Tuple::CodegenCopyStrings(
codegen, *node->tuple_desc(), &copy_strings_fn));
replaced = codegen->ReplaceCallSites(fn, copy_strings_fn, "CopyStrings");
DCHECK_REPLACE_COUNT(replaced, 1);
int tuple_byte_size = node->tuple_desc()->byte_size();
replaced = codegen->ReplaceCallSitesWithValue(fn,
codegen->GetI32Constant(tuple_byte_size), "tuple_byte_size");
DCHECK_REPLACE_COUNT(replaced, 1);
fn->setName("DecodeAvroData");
*decode_avro_data_fn = codegen->FinalizeFunction(fn);
if (*decode_avro_data_fn == nullptr) {
return Status("Failed to finalize decode_avro_data_fn.");
}
return Status::OK();
}