blob: bec2c27d2b19554446b2ba14ea08b9eddbfcf993 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/orc/orc-column-readers.h"
#include <queue>
#include "runtime/collection-value-builder.h"
#include "runtime/date-value.h"
#include "runtime/decimal-value.h"
#include "runtime/string-value.inline.h"
#include "runtime/timestamp-value.inline.h"
#include "util/mem-util.h"
#include "common/names.h"
using namespace impala;
namespace impala {
string PrintNode(const orc::Type* node) {
return Substitute("$0 column (ORC id=$1)", node->toString(), node->getColumnId());
}
bool OrcRowValidator::IsRowBatchValid() const {
if (write_ids_ == nullptr || write_ids_->numElements == 0) return true;
int64_t write_id = write_ids_->data[0];
return valid_write_ids_.IsWriteIdValid(write_id);
}
OrcColumnReader* OrcColumnReader::Create(const orc::Type* node,
const SlotDescriptor* slot_desc, HdfsOrcScanner* scanner) {
DCHECK(node != nullptr);
DCHECK(slot_desc != nullptr);
OrcColumnReader* reader = nullptr;
if (node->getKind() == orc::TypeKind::STRUCT) {
DCHECK_EQ(scanner->slot_to_col_id_.count(slot_desc), 1);
const bool node_matches_slot_desc =
scanner->slot_to_col_id_[slot_desc] == node->getColumnId();
if (node_matches_slot_desc && slot_desc->type().IsStructType()) {
// 'node' corresponds to 'slot_desc' and 'slot_desc' refers to a struct: this is the
// case where we should materialize the struct and its children.
// Note: 'node' could also correspond to 'slot_desc' if 'slot_desc' refers to an
// array.
DCHECK(slot_desc->children_tuple_descriptor() != nullptr);
reader = new OrcStructReader(node, slot_desc,
slot_desc->children_tuple_descriptor(), scanner);
} else {
// A descendant of 'node' corresponds to 'slot_desc'. The struct referenced by
// 'node' will not be materialised.
reader = new OrcStructReader(node, slot_desc, scanner);
}
} else if (node->getKind() == orc::TypeKind::LIST) {
reader = new OrcListReader(node, slot_desc, scanner);
} else if (node->getKind() == orc::TypeKind::MAP) {
reader = new OrcMapReader(node, slot_desc, scanner);
} else {
switch (slot_desc->type().type) {
case TYPE_BOOLEAN:
reader = new OrcBoolColumnReader(node, slot_desc, scanner);
break;
case TYPE_TINYINT:
reader = new OrcIntColumnReader<int8_t>(node, slot_desc, scanner);
break;
case TYPE_SMALLINT:
reader = new OrcIntColumnReader<int16_t>(node, slot_desc, scanner);
break;
case TYPE_INT:
reader = new OrcIntColumnReader<int32_t>(node, slot_desc, scanner);
break;
case TYPE_BIGINT:
reader = new OrcIntColumnReader<int64_t>(node, slot_desc, scanner);
break;
case TYPE_FLOAT:
reader = new OrcDoubleColumnReader<float>(node, slot_desc, scanner);
break;
case TYPE_DOUBLE:
reader = new OrcDoubleColumnReader<double>(node, slot_desc, scanner);
break;
case TYPE_TIMESTAMP:
reader = new OrcTimestampReader(node, slot_desc, scanner);
break;
case TYPE_STRING:
case TYPE_VARCHAR:
case TYPE_CHAR:
reader = new OrcStringColumnReader(node, slot_desc, scanner);
break;
case TYPE_DECIMAL:
if (node->getPrecision() == 0 || node->getPrecision() > 18) {
// For decimals whose precision is larger than 18, its value can't fit into
// an int64 (10^19 > 2^63). So we should use int128 for this case.
reader = new OrcDecimal16ColumnReader(node, slot_desc, scanner);
} else {
switch (slot_desc->type().GetByteSize()) {
case 4:
reader = new OrcDecimalColumnReader<Decimal4Value>(
node, slot_desc, scanner);
break;
case 8:
reader = new OrcDecimalColumnReader<Decimal8Value>(
node, slot_desc, scanner);
break;
case 16:
reader = new OrcDecimalColumnReader<Decimal16Value>(
node, slot_desc, scanner);
break;
default:
DCHECK(false) << "invalidate byte size for decimal type: "
<< slot_desc->type().GetByteSize();
}
}
break;
case TYPE_DATE:
reader = new OrcDateColumnReader(node, slot_desc, scanner);
break;
default:
DCHECK(false) << slot_desc->type().DebugString();
} // end of switch
}
return scanner->obj_pool_.Add(reader);
}
OrcComplexColumnReader* OrcComplexColumnReader::CreateTopLevelReader(
const orc::Type* node, const TupleDescriptor* table_tuple_desc,
HdfsOrcScanner* scanner) {
OrcComplexColumnReader* reader = nullptr;
if (node->getKind() == orc::TypeKind::STRUCT) {
reader = new OrcStructReader(node, table_tuple_desc, scanner);
} else if (node->getKind() == orc::TypeKind::LIST) {
reader = new OrcListReader(node, table_tuple_desc, scanner);
} else if (node->getKind() == orc::TypeKind::MAP) {
reader = new OrcMapReader(node, table_tuple_desc, scanner);
} else {
DCHECK(false) << "Can't create top level reader for " << PrintNode(node);
}
return scanner->obj_pool_.Add(reader);
}
OrcColumnReader::OrcColumnReader(const orc::Type* orc_type,
const SlotDescriptor* slot_desc, HdfsOrcScanner* scanner)
: slot_desc_(slot_desc), scanner_(scanner) {
orc_column_id_ = DCHECK_NOTNULL(orc_type)->getColumnId();
if (slot_desc_ == nullptr) {
orc::TypeKind type_kind = orc_type->getKind();
DCHECK(type_kind == orc::TypeKind::LIST
|| type_kind == orc::TypeKind::MAP
|| type_kind == orc::TypeKind::STRUCT)
<< "Selected primitive types should have SlotDescriptors";
}
VLOG(3) << "Created reader for " << PrintNode(orc_type) << ": slot_desc_="
<< (slot_desc_? slot_desc_->DebugString() : "null");
}
Status OrcBoolColumnReader::ReadValue(int row_idx, Tuple* tuple, MemPool* pool) {
if (IsNull(DCHECK_NOTNULL(batch_), row_idx)) {
SetNullSlot(tuple);
return Status::OK();
}
int64_t val = batch_->data.data()[row_idx];
*(reinterpret_cast<bool*>(GetSlot(tuple))) = (val != 0);
return Status::OK();
}
Status OrcStringColumnReader::InitBlob(orc::DataBuffer<char>* blob, MemPool* pool) {
// TODO: IMPALA-9310: Possible improvement is moving the buffer out from orc::DataBuffer
// instead of copying and let Impala free the memory later.
blob_ = reinterpret_cast<char*>(pool->TryAllocateUnaligned(blob->size()));
if (UNLIKELY(blob_ == nullptr)) {
string details = Substitute("Could not allocate string buffer of $0 bytes "
"for ORC file '$1'.", blob->size(), scanner_->filename());
return scanner_->scan_node_->mem_tracker()->MemLimitExceeded(
scanner_->state_, details, blob->size());
}
memcpy(blob_, blob->data(), blob->size());
return Status::OK();
}
Status OrcStringColumnReader::ReadValueBatch(int row_idx,
ScratchTupleBatch* scratch_batch, MemPool* pool, int scratch_batch_idx) {
switch (slot_desc_->type().type) {
case TYPE_STRING:
if (batch_->isEncoded) {
return ReadValueBatchInternal<TYPE_STRING, true>(
row_idx, scratch_batch, scratch_batch_idx);
}
return ReadValueBatchInternal<TYPE_STRING, false>(
row_idx, scratch_batch, scratch_batch_idx);
case TYPE_CHAR:
if (batch_->isEncoded) {
return ReadValueBatchInternal<TYPE_CHAR, true>(
row_idx, scratch_batch, scratch_batch_idx);
}
return ReadValueBatchInternal<TYPE_CHAR, false>(
row_idx, scratch_batch, scratch_batch_idx);
case TYPE_VARCHAR:
if (batch_->isEncoded) {
return ReadValueBatchInternal<TYPE_VARCHAR, true>(
row_idx, scratch_batch, scratch_batch_idx);
}
return ReadValueBatchInternal<TYPE_VARCHAR, false>(
row_idx, scratch_batch, scratch_batch_idx);
default:
return Status("Illegal string type: " + TypeToString(slot_desc_->type().type));
}
}
Status OrcStringColumnReader::ReadValue(int row_idx, Tuple* tuple, MemPool* pool) {
switch (slot_desc_->type().type) {
case TYPE_STRING:
if (batch_->isEncoded) {
return ReadValueInternal<TYPE_STRING, true>(row_idx, tuple);
}
return ReadValueInternal<TYPE_STRING, false>(row_idx, tuple);
case TYPE_CHAR:
if (batch_->isEncoded) {
return ReadValueInternal<TYPE_CHAR, true>(row_idx, tuple);
}
return ReadValueInternal<TYPE_CHAR, false>(row_idx, tuple);
case TYPE_VARCHAR:
if (batch_->isEncoded) {
return ReadValueInternal<TYPE_VARCHAR, true>(row_idx, tuple);
}
return ReadValueInternal<TYPE_VARCHAR, false>(row_idx, tuple);
default:
return Status("Illegal string type: " + TypeToString(slot_desc_->type().type));
}
}
template<PrimitiveType SLOT_TYPE, bool ENCODED>
Status OrcStringColumnReader::ReadValueBatchInternal(int row_idx,
ScratchTupleBatch* scratch_batch, int scratch_batch_idx) {
int num_to_read = std::min<int>(scratch_batch->capacity - scratch_batch_idx,
NumElements() - row_idx);
DCHECK_LE(row_idx + num_to_read, NumElements());
for (int i = 0; i < num_to_read; ++i) {
int scratch_batch_pos = i + scratch_batch_idx;
uint8_t* next_tuple = scratch_batch->tuple_mem +
scratch_batch_pos * OrcColumnReader::scanner_->tuple_byte_size();
Tuple* tuple = reinterpret_cast<Tuple*>(next_tuple);
Status s = ReadValueInternal<SLOT_TYPE, ENCODED>(row_idx + i, tuple);
RETURN_IF_ERROR(s);
}
scratch_batch->num_tuples = scratch_batch_idx + num_to_read;
return Status::OK();
}
template<PrimitiveType SLOT_TYPE, bool ENCODED>
Status OrcStringColumnReader::ReadValueInternal(int row_idx, Tuple* tuple) {
if (IsNull(DCHECK_NOTNULL(batch_), row_idx)) {
SetNullSlot(tuple);
return Status::OK();
}
char* src_ptr;
int src_len;
if (ENCODED) {
DCHECK(batch_->isEncoded);
orc::EncodedStringVectorBatch* currentBatch =
static_cast<orc::EncodedStringVectorBatch*>(batch_);
orc::DataBuffer<int64_t>& offsets = currentBatch->dictionary->dictionaryOffset;
int64_t index = currentBatch->index[row_idx];
if (UNLIKELY(index < 0 || static_cast<uint64_t>(index) + 1 >= offsets.size())) {
return Status(Substitute("Corrupt ORC file: $0. Index ($1) out of range [0, $2) in "
"StringDictionary.", scanner_->filename(), index, offsets.size()));;
}
src_ptr = blob_ + offsets[index];
src_len = offsets[index + 1] - offsets[index];
} else {
// The pointed data is now in blob_, a buffer handled by Impala.
src_ptr = blob_ + (batch_->data[row_idx] - batch_->blob.data());
src_len = batch_->length[row_idx];
}
int dst_len = slot_desc_->type().len;
if (SLOT_TYPE == TYPE_CHAR) {
DCHECK_EQ(slot_desc_->type().type, TYPE_CHAR);
int unpadded_len = min(dst_len, static_cast<int>(src_len));
char* dst_char = reinterpret_cast<char*>(GetSlot(tuple));
memcpy(dst_char, src_ptr, unpadded_len);
StringValue::PadWithSpaces(dst_char, dst_len, unpadded_len);
return Status::OK();
}
StringValue* dst = reinterpret_cast<StringValue*>(GetSlot(tuple));
dst->Assign(src_ptr, src_len);
if (SLOT_TYPE == TYPE_VARCHAR && dst_len < src_len) {
dst->SetLen(dst_len);
}
return Status::OK();
}
Status OrcTimestampReader::ReadValue(int row_idx, Tuple* tuple, MemPool* pool) {
if (IsNull(DCHECK_NOTNULL(batch_), row_idx)) {
SetNullSlot(tuple);
return Status::OK();
}
int64_t secs = batch_->data.data()[row_idx];
int64_t nanos = batch_->nanoseconds.data()[row_idx];
auto slot = reinterpret_cast<TimestampValue*>(GetSlot(tuple));
*slot = TimestampValue::FromUnixTimeNanos(secs, nanos, timezone_);
if (UNLIKELY(!slot->HasDate())) {
SetNullSlot(tuple);
TErrorCode::type errorCode = TErrorCode::ORC_TIMESTAMP_OUT_OF_RANGE;
ErrorMsg msg(errorCode, scanner_->filename(), orc_column_id_);
return scanner_->state_->LogOrReturnError(msg);
}
return Status::OK();
}
Status OrcDateColumnReader::ReadValue(int row_idx, Tuple* tuple, MemPool* pool) {
if (IsNull(DCHECK_NOTNULL(batch_), row_idx)) {
SetNullSlot(tuple);
return Status::OK();
}
DateValue dv(batch_->data.data()[row_idx]);
if (UNLIKELY(!dv.IsValid())) {
SetNullSlot(tuple);
ErrorMsg msg(TErrorCode::ORC_DATE_OUT_OF_RANGE, scanner_->filename(), orc_column_id_);
return scanner_->state_->LogOrReturnError(msg);
}
DateValue* slot = reinterpret_cast<DateValue*>(GetSlot(tuple));
*slot = dv;
return Status::OK();
}
Status OrcDecimal16ColumnReader::ReadValue(int row_idx, Tuple* tuple, MemPool* pool) {
if (IsNull(DCHECK_NOTNULL(batch_), row_idx)) {
SetNullSlot(tuple);
return Status::OK();
}
orc::Int128 orc_val = batch_->values.data()[row_idx];
DCHECK_EQ(slot_desc_->type().GetByteSize(), 16);
__int128_t val = orc_val.getHighBits();
val <<= 64;
val |= orc_val.getLowBits();
// Use memcpy to avoid gcc generating unaligned instructions like movaps
// for int128_t. They will raise SegmentFault when addresses are not
// aligned to 16 bytes.
memcpy(GetSlot(tuple), &val, sizeof(__int128_t));
return Status::OK();
}
OrcComplexColumnReader::OrcComplexColumnReader(const orc::Type* node,
const TupleDescriptor* table_tuple_desc, HdfsOrcScanner* scanner)
: OrcBatchedReader(node, nullptr, scanner) {
uint64_t node_col_id = node->getColumnId();
uint64_t tuple_desc_col_id = GetColId(table_tuple_desc);
if (node_col_id == tuple_desc_col_id) tuple_desc_ = table_tuple_desc;
materialize_tuple_ = (tuple_desc_ != nullptr);
VLOG(3) << "Created top level ComplexColumnReader for " << PrintNode(node)
<< ": tuple_desc_=" << (tuple_desc_ ? tuple_desc_->DebugString() : "null");
}
bool OrcStructReader::EndOfBatch() {
DCHECK(slot_desc_ == nullptr
&& (tuple_desc_ == nullptr || tuple_desc_ == scanner_->scan_node_->tuple_desc()))
<< "Should be top level reader when calling EndOfBatch()";
DCHECK(vbatch_ == nullptr || row_idx_ <= NumElements());
return vbatch_ == nullptr || row_idx_ == NumElements();
}
inline uint64_t OrcComplexColumnReader::GetTargetColId(
const SlotDescriptor* slot_desc) const {
return slot_desc->type().IsCollectionType() ?
GetColId(slot_desc->children_tuple_descriptor()):
GetColId(slot_desc);
}
/**
* Returns true if 'candidate_col_id' is under 'node' in the type hierarchy.
*/
inline bool IsDescendant(const orc::Type& node, uint64_t candidate_col_id) {
uint64_t node_col_id = node.getColumnId();
uint64_t node_max_col_id = node.getMaximumColumnId();
return node_col_id <= candidate_col_id && candidate_col_id <= node_max_col_id;
}
/**
* 'parent' is a complex type that might have multiple children, i.e. STRUCT or MAP.
* 'descendant_col_id' refers to an ORC type somewhere under 'parent' in the type
* hieararchy. This method selects the direct child of 'parent' that is also the ancestor
* of 'descendant_col_id'. Return the result in '*child' and its index in '*field'.
* Returns false for not found.
*/
bool FindChild(const orc::Type& parent, uint64_t descendant_col_id,
const orc::Type** child, int* field) {
DCHECK(parent.getKind() == orc::TypeKind::STRUCT ||
parent.getKind() == orc::TypeKind::MAP);
int size = parent.getSubtypeCount();
for (int c = 0; c < size; ++c) {
const orc::Type* node = parent.getSubtype(c);
if (node && IsDescendant(*node, descendant_col_id)) {
*child = node;
*field = c;
return true;
}
}
return false;
}
void OrcStructReader::CreateChildForSlot(const orc::Type* curr_node,
const SlotDescriptor* slot_desc) {
// 'slot_desc' matches a descendant of 'curr_node' which may not be a direct child.
// Find a child node that lays in the path from 'curr_node' to the descendant.
// Create a child reader and pass down 'slot_desc'.
const orc::Type* child_node;
int field;
if (!FindChild(*curr_node, GetTargetColId(slot_desc), &child_node, &field)) {
DCHECK(false) << PrintNode(curr_node) << " has no children selected for "
<< slot_desc->DebugString();
}
OrcColumnReader* child = OrcColumnReader::Create(child_node, slot_desc, scanner_);
children_.push_back(child);
children_fields_.push_back(field);
}
OrcStructReader::OrcStructReader(const orc::Type* node,
const TupleDescriptor* table_tuple_desc, HdfsOrcScanner* scanner)
: OrcComplexColumnReader(node, table_tuple_desc, scanner) {
bool needs_row_validation = table_tuple_desc == scanner_->scan_node_->tuple_desc() &&
node->getColumnId() == 0 &&
scanner_->row_batches_need_validation_;
if (materialize_tuple_) {
for (SlotDescriptor* child_slot : tuple_desc_->slots()) {
// Skip partition columns and missed columns
if (scanner->IsPartitionKeySlot(child_slot) ||
scanner->IsMissingField(child_slot) ||
child_slot->IsVirtual()) {
continue;
}
CreateChildForSlot(node, child_slot);
}
} else {
// No tuples should be materialized by this reader, because 'table_tuple_desc'
// matches to a descendant of 'node'. Those tuples should be materialized by the
// corresponding descendant reader. So 'node' should have exactly one selected
// subtype: the child in the path to the target descendant.
DCHECK_EQ(node->getSubtypeCount(), needs_row_validation ? 2 : 1);
int child_index = needs_row_validation ? 1 : 0;
OrcComplexColumnReader* child = OrcComplexColumnReader::CreateTopLevelReader(
node->getSubtype(child_index), table_tuple_desc, scanner);
children_.push_back(child);
children_fields_.push_back(child_index);
}
if (needs_row_validation) {
row_validator_.reset(new OrcRowValidator(scanner_->valid_write_ids_));
for (int i = 0; i < node->getSubtypeCount(); ++i) {
if (node->getSubtype(i)->getColumnId() == CURRENT_TRANSCACTION_TYPE_ID) {
current_write_id_field_index_ = i;
break;
}
}
}
}
OrcStructReader::OrcStructReader(const orc::Type* node, const SlotDescriptor* slot_desc,
const TupleDescriptor* children_tuple, HdfsOrcScanner* scanner)
: OrcComplexColumnReader(node, slot_desc, scanner) {
tuple_desc_ = children_tuple;
materialize_tuple_ = true;
for (SlotDescriptor* child_slot : tuple_desc_->slots()) {
CreateChildForSlot(node, child_slot);
}
}
OrcStructReader::OrcStructReader(const orc::Type* node,
const SlotDescriptor* slot_desc, HdfsOrcScanner* scanner)
: OrcComplexColumnReader(node, slot_desc, scanner) {
// 'slot_desc' won't map to a STRUCT column. It only matches a descendant column.
// If the descendant column is missing in the file, skip creating the child reader.
if (scanner->IsMissingField(slot_desc)) return;
CreateChildForSlot(node, slot_desc);
VLOG(3) << "Created StructReader for " << PrintNode(node) << ": slot_desc_="
<< slot_desc->DebugString();
}
Status OrcStructReader::ReadValue(int row_idx, Tuple* tuple, MemPool* pool) {
if (!MaterializeTuple()) {
DCHECK_EQ(1, children_.size());
OrcColumnReader* child = children_[0];
return child->ReadValue(row_idx, tuple, pool);
}
if (IsNull(DCHECK_NOTNULL(batch_), row_idx)) {
SetNullSlot(tuple);
return Status::OK();
}
for (OrcColumnReader* child : children_) {
RETURN_IF_ERROR(child->ReadValue(row_idx, tuple, pool));
}
return Status::OK();
}
Status OrcStructReader::TopLevelReadValueBatch(ScratchTupleBatch* scratch_batch,
MemPool* pool) {
// Validate row batch if needed.
if (row_validator_) DCHECK(scanner_->row_batches_need_validation_);
if (row_validator_ && !row_validator_->IsRowBatchValid()) {
row_idx_ = NumElements();
return Status::OK();
}
// Saving the initial value of num_tuples because each child->ReadValueBatch() will
// update it.
int scratch_batch_idx = scratch_batch->num_tuples;
int item_count = -1;
for (OrcColumnReader* child : children_) {
RETURN_IF_ERROR(
child->ReadValueBatch(row_idx_, scratch_batch, pool, scratch_batch_idx));
// Check if each column reader reads the same amount of values.
if (item_count == -1) item_count = scratch_batch->num_tuples;
if (item_count != scratch_batch->num_tuples) {
return Status(Substitute("Corrupt ORC file '$0': Expected number of items in "
"each column: $1 Actual number in col '$2': $3", scanner_->filename(),
item_count, orc_column_id_, scratch_batch->num_tuples));
}
}
int num_rows_read = scratch_batch->num_tuples - scratch_batch_idx;
if (children_.empty()) {
// We allow empty 'children_' in all cases.
DCHECK_EQ(0, num_rows_read);
num_rows_read = std::min(scratch_batch->capacity - scratch_batch->num_tuples,
NumElements() - row_idx_);
scratch_batch->num_tuples += num_rows_read;
}
if (scanner_->file_position_ != nullptr) {
FillVirtualRowIdColumn(scratch_batch, scratch_batch_idx, num_rows_read);
}
row_idx_ += num_rows_read;
return Status::OK();
}
void OrcStructReader::FillVirtualRowIdColumn(ScratchTupleBatch* scratch_batch,
int scratch_batch_idx, int num_rows) {
DCHECK(scanner_->file_position_ != nullptr);
int tuple_size = OrcColumnReader::scanner_->tuple_byte_size();
uint8_t* first_tuple = scratch_batch->tuple_mem + scratch_batch_idx * tuple_size;
int64_t* first_slot = reinterpret_cast<Tuple*>(first_tuple)->GetBigIntSlot(
scanner_->file_position_->tuple_offset());
StrideWriter<int64_t> out{first_slot, tuple_size};
for (int i = 0; i < num_rows; ++i) {
*out.Advance() = file_row_idx_++;
}
}
Status OrcStructReader::ReadValueBatch(int row_idx, ScratchTupleBatch* scratch_batch,
MemPool* pool, int scratch_batch_idx) {
if (materialize_tuple_) {
return OrcBatchedReader::ReadValueBatch(row_idx, scratch_batch, pool,
scratch_batch_idx);
}
for (OrcColumnReader* child : children_) {
RETURN_IF_ERROR(
child->ReadValueBatch(row_idx, scratch_batch, pool, scratch_batch_idx));
}
return Status::OK();
}
Status OrcStructReader::UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) {
RETURN_IF_ERROR(OrcComplexColumnReader::UpdateInputBatch(orc_batch));
batch_ = static_cast<orc::StructVectorBatch*>(orc_batch);
// In debug mode, we use dynamic_cast<> to double-check the downcast is legal
DCHECK(batch_ == dynamic_cast<orc::StructVectorBatch*>(orc_batch));
if (batch_ == nullptr || batch_->numElements == 0) {
row_idx_ = 0;
for (OrcColumnReader* child : children_) {
RETURN_IF_ERROR(child->UpdateInputBatch(nullptr));
}
return Status::OK();
}
row_idx_ = 0;
int size = children_.size();
for (int c = 0; c < size; ++c) {
RETURN_IF_ERROR(children_[c]->UpdateInputBatch(batch_->fields[children_fields_[c]]));
}
if (row_validator_) {
orc::ColumnVectorBatch* write_id_batch =
batch_->fields[current_write_id_field_index_];
DCHECK_EQ(static_cast<orc::LongVectorBatch*>(write_id_batch),
dynamic_cast<orc::LongVectorBatch*>(write_id_batch));
row_validator_->UpdateTransactionBatch(
static_cast<orc::LongVectorBatch*>(write_id_batch));
}
return Status::OK();
}
OrcCollectionReader::OrcCollectionReader(const orc::Type* node,
const SlotDescriptor* slot_desc, HdfsOrcScanner* scanner)
: OrcComplexColumnReader(node, slot_desc, scanner) {
if (slot_desc->type().IsCollectionType() &&
node->getColumnId() == GetTargetColId(slot_desc)) {
// This is a collection SlotDescriptor whose item TupleDescriptor matches
// 'node'. We should materialize the slot (creating a CollectionValue) and its
// collection tuples (see more in HdfsOrcScanner::AssembleCollection).
tuple_desc_ = slot_desc->children_tuple_descriptor();
materialize_tuple_ = true;
}
}
Status OrcCollectionReader::AssembleCollection(int row_idx, Tuple* tuple, MemPool* pool) {
if (IsNull(DCHECK_NOTNULL(vbatch_), row_idx)) {
SetNullSlot(tuple);
return Status::OK();
}
auto coll_slot = reinterpret_cast<CollectionValue*>(GetSlot(tuple));
*coll_slot = CollectionValue();
const TupleDescriptor* tuple_desc = slot_desc_->children_tuple_descriptor();
DCHECK(tuple_desc != nullptr) << "There is no children tuple for slot ID: " <<
slot_desc_->id();
CollectionValueBuilder builder(coll_slot, *tuple_desc, pool, scanner_->state_);
return scanner_->AssembleCollection(*this, row_idx, &builder);
}
int OrcListReader::NumElements() const {
if (DirectReader()) return batch_ != nullptr ? batch_->numElements : 0;
if (children_.empty()) {
return batch_ != nullptr ? batch_->offsets[batch_->numElements] : 0;
}
return children_[0]->NumElements();
}
Status OrcListReader::ReadValue(int row_idx, Tuple* tuple, MemPool* pool) {
if (DirectReader()) return AssembleCollection(row_idx, tuple, pool);
for (OrcColumnReader* child : children_) {
RETURN_IF_ERROR(child->ReadValue(row_idx, tuple, pool));
}
if (pos_slot_desc_ != nullptr) {
RETURN_IF_ERROR(SetPositionSlot(row_idx, tuple));
}
return Status::OK();
}
Status OrcMapReader::ReadValue(int row_idx, Tuple* tuple, MemPool* pool) {
if (DirectReader()) return AssembleCollection(row_idx, tuple, pool);
for (OrcColumnReader* child : children_) {
RETURN_IF_ERROR(child->ReadValue(row_idx, tuple, pool));
}
return Status::OK();
}
Status OrcListReader::SetPositionSlot(int row_idx, Tuple* tuple) {
DCHECK(pos_slot_desc_ != nullptr);
int64_t pos = -1;
DCHECK_LT(list_idx_, batch_->numElements);
if (list_idx_ == batch_->numElements - 1 ||
(batch_->offsets[list_idx_] <= row_idx && row_idx < batch_->offsets[list_idx_+1])) {
// We are somewhere in the current list.
pos = row_idx - batch_->offsets[list_idx_];
} else if (row_idx == batch_->offsets[list_idx_+1]) {
// Let's move to the next list.
pos = 0;
list_idx_ += 1;
}
else if (row_idx > batch_->offsets[list_idx_+1]) {
// We lagged behind. Let's find our list.
for (int i = list_idx_; i < batch_->numElements; ++i) {
if (row_idx < batch_->offsets[i+1]) {
pos = row_idx - batch_->offsets[i];
list_idx_ = i;
break;
}
}
}
if (pos < 0) {
// Oops, something went wrong. It can be caused by a corrupt file, so let's raise
// an error.
return Status(Substitute(
"ORC list indexes and elements are inconsistent in file $0",
scanner_->filename()));
}
int64_t* slot_val_ptr = reinterpret_cast<int64_t*>(tuple->GetSlot(
pos_slot_desc_->tuple_offset()));
*slot_val_ptr = pos;
return Status::OK();
}
void OrcListReader::CreateChildForSlot(const orc::Type* node,
const SlotDescriptor* slot_desc) {
uint64_t slot_col_id = GetTargetColId(slot_desc);
DCHECK(IsDescendant(*node, slot_col_id));
// We have a position slot descriptor if it refers to this LIST ORC type, but it isn't
// a collection slot.
bool is_pos_slot = slot_col_id == node->getColumnId() &&
slot_desc->children_tuple_descriptor() == nullptr;
if (is_pos_slot) {
DCHECK(pos_slot_desc_ == nullptr) << "Should have unique pos slot";
pos_slot_desc_ = slot_desc;
} else {
OrcColumnReader* child = OrcColumnReader::Create(node->getSubtype(0), slot_desc,
scanner_);
children_.push_back(child);
}
}
OrcListReader::OrcListReader(const orc::Type* node,
const TupleDescriptor* table_tuple_desc, HdfsOrcScanner* scanner)
: OrcCollectionReader(node, table_tuple_desc, scanner) {
if (materialize_tuple_) {
DCHECK(tuple_desc_ != nullptr);
for (SlotDescriptor* child_slot : tuple_desc_->slots()) {
CreateChildForSlot(node, child_slot);
}
} else {
OrcComplexColumnReader* child = OrcComplexColumnReader::CreateTopLevelReader(
node->getSubtype(0), table_tuple_desc, scanner);
children_.push_back(child);
}
}
OrcListReader::OrcListReader(const orc::Type* node, const SlotDescriptor* slot_desc,
HdfsOrcScanner* scanner) : OrcCollectionReader(node, slot_desc, scanner) {
if (materialize_tuple_) {
DCHECK(tuple_desc_ != nullptr);
for (SlotDescriptor* child_slot : tuple_desc_->slots()) {
CreateChildForSlot(node, child_slot);
}
} else {
// 'slot_desc' matches a descendant instead. Create a child reader for the child node
// laying in the path to the descendant.
CreateChildForSlot(node, slot_desc);
}
VLOG(3) << "Created ListReader for " << PrintNode(node) << ": tuple_desc_="
<< (tuple_desc_ != nullptr ? tuple_desc_->DebugString() : "null");
}
Status OrcListReader::UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) {
RETURN_IF_ERROR(OrcComplexColumnReader::UpdateInputBatch(orc_batch));
batch_ = static_cast<orc::ListVectorBatch*>(orc_batch);
// In debug mode, we use dynamic_cast<> to double-check the downcast is legal
DCHECK(batch_ == dynamic_cast<orc::ListVectorBatch*>(orc_batch));
orc::ColumnVectorBatch* item_batch = batch_ ? batch_->elements.get() : nullptr;
for (OrcColumnReader* child : children_) {
RETURN_IF_ERROR(child->UpdateInputBatch(item_batch));
}
list_idx_ = 0;
return Status::OK();
}
int OrcListReader::GetNumChildValues(int row_idx) const {
if (IsNull(DCHECK_NOTNULL(batch_), row_idx)) return 0;
DCHECK_GT(batch_->offsets.size(), row_idx + 1);
return batch_->offsets[row_idx + 1] - batch_->offsets[row_idx];
}
int OrcListReader::GetChildBatchOffset(int row_idx) const {
return batch_->offsets[row_idx];
}
Status OrcListReader::ReadChildrenValue(int row_idx, int tuple_idx, Tuple* tuple,
MemPool* pool) const {
DCHECK_LT(row_idx, batch_->numElements);
int offset = batch_->offsets[row_idx];
if (pos_slot_desc_) {
int64_t* slot_val_ptr = reinterpret_cast<int64_t*>(
tuple->GetSlot(pos_slot_desc_->tuple_offset()));
*slot_val_ptr = tuple_idx;
}
for (OrcColumnReader* child : children_) {
RETURN_IF_ERROR(child->ReadValue(offset + tuple_idx, tuple, pool));
}
return Status::OK();
}
void OrcMapReader::CreateChildForSlot(const orc::Type* node,
const SlotDescriptor* slot_desc) {
const orc::Type* child_type;
int field;
if (!FindChild(*node, GetTargetColId(slot_desc), &child_type, &field)) {
DCHECK(false) << PrintNode(node) << " has no children selected for "
<< slot_desc->DebugString();
}
// Map type only has two children.
DCHECK_LE(field, 1);
OrcColumnReader* child = OrcColumnReader::Create(child_type, slot_desc, scanner_);
children_.push_back(child);
if (field == 0) {
key_readers_.push_back(child);
} else {
value_readers_.push_back(child);
}
}
OrcMapReader::OrcMapReader(const orc::Type* node,
const TupleDescriptor* table_tuple_desc, HdfsOrcScanner* scanner)
: OrcCollectionReader(node, table_tuple_desc, scanner) {
if (materialize_tuple_) {
DCHECK(tuple_desc_ != nullptr);
for (SlotDescriptor* child_slot : tuple_desc_->slots()) {
CreateChildForSlot(node, child_slot);
}
} else {
const orc::Type* child_type;
int field;
if (!FindChild(*node, GetColId(table_tuple_desc),
&child_type, &field)) {
DCHECK(false) << PrintNode(node) << " has no children selected for "
<< table_tuple_desc->DebugString();
}
// Map type only has two children.
DCHECK_LE(field, 1);
OrcComplexColumnReader* child = OrcComplexColumnReader::CreateTopLevelReader(
child_type, table_tuple_desc, scanner);
children_.push_back(child);
if (field == 0) {
key_readers_.push_back(child);
} else {
value_readers_.push_back(child);
}
}
VLOG(3) << "Created MapReader for " << PrintNode(node) << ": tuple_desc_="
<< (tuple_desc_ != nullptr ? tuple_desc_->DebugString() : "null");
}
OrcMapReader::OrcMapReader(const orc::Type* node, const SlotDescriptor* slot_desc,
HdfsOrcScanner* scanner) : OrcCollectionReader(node, slot_desc, scanner) {
if (materialize_tuple_) {
DCHECK(tuple_desc_ != nullptr);
for (SlotDescriptor* child_slot : tuple_desc_->slots()) {
CreateChildForSlot(node, child_slot);
}
} else {
CreateChildForSlot(node, slot_desc);
}
}
Status OrcMapReader::UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) {
RETURN_IF_ERROR(OrcComplexColumnReader::UpdateInputBatch(orc_batch));
batch_ = static_cast<orc::MapVectorBatch*>(orc_batch);
// In debug mode, we use dynamic_cast<> to double-check the downcast is legal
DCHECK(batch_ == dynamic_cast<orc::MapVectorBatch*>(orc_batch));
orc::ColumnVectorBatch* key_batch = batch_ ? batch_->keys.get() : nullptr;
orc::ColumnVectorBatch* value_batch = batch_ ? batch_->elements.get() : nullptr;
for (OrcColumnReader* child : key_readers_) {
RETURN_IF_ERROR(child->UpdateInputBatch(key_batch));
}
for (OrcColumnReader* child : value_readers_) {
RETURN_IF_ERROR(child->UpdateInputBatch(value_batch));
}
return Status::OK();
}
int OrcMapReader::GetNumChildValues(int row_idx) const {
if (IsNull(batch_, row_idx)) return 0;
DCHECK_GT(batch_->offsets.size(), row_idx + 1);
return batch_->offsets[row_idx + 1] - batch_->offsets[row_idx];
}
int OrcMapReader::GetChildBatchOffset(int row_idx) const {
return batch_->offsets[row_idx];
}
Status OrcMapReader::ReadChildrenValue(int row_idx, int tuple_idx, Tuple* tuple,
MemPool* pool) const {
DCHECK_LT(row_idx, batch_->numElements);
int offset = batch_->offsets[row_idx];
for (OrcColumnReader* child : children_) {
RETURN_IF_ERROR(child->ReadValue(offset + tuple_idx, tuple, pool));
}
return Status::OK();
}
int OrcMapReader::NumElements() const {
if (DirectReader()) return batch_ != nullptr ? batch_->numElements : 0;
if (children_.empty()) {
return batch_ != nullptr ? batch_->offsets[batch_->numElements] : 0;
}
return children_[0]->NumElements();
}
} // namespace impala