blob: 8c28ab7147d51ce5d0146e8052bdae168e167cd8 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "parquet-collection-column-reader.h"
#include "runtime/collection-value-builder.h"
namespace impala {
void CollectionColumnReader::Close(RowBatch* row_batch) {
for (ParquetColumnReader* child_reader : children_) {
child_reader->Close(row_batch);
}
}
bool CollectionColumnReader::NextLevels() {
DCHECK(!children_.empty());
DCHECK_LE(rep_level_, new_collection_rep_level());
for (int c = 0; c < children_.size(); ++c) {
do {
// TODO: verify somewhere that all column readers are at end
if (!children_[c]->NextLevels()) return false;
} while (children_[c]->rep_level() > new_collection_rep_level());
}
UpdateDerivedState();
return true;
}
bool CollectionColumnReader::ReadValue(MemPool* pool, Tuple* tuple) {
DCHECK_GE(rep_level_, 0);
DCHECK_GE(def_level_, 0);
DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor())
<< "Caller should have called NextLevels() until we are ready to read a value";
if (tuple_offset_ == -1) {
return CollectionColumnReader::NextLevels();
} else if (def_level_ >= max_def_level()) {
return ReadSlot(tuple->GetCollectionSlot(tuple_offset_), pool);
} else {
// Null value
tuple->SetNull(null_indicator_offset_);
return CollectionColumnReader::NextLevels();
}
}
bool CollectionColumnReader::ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) {
return CollectionColumnReader::ReadValue(pool, tuple);
}
bool CollectionColumnReader::ReadValueBatch(MemPool* pool, int max_values,
int tuple_size, uint8_t* tuple_mem, int* num_values) {
// The below loop requires that NextLevels() was called previously to populate
// 'def_level_' and 'rep_level_'. Ensure it is called at the start of each
// row group.
if (def_level_ == ParquetLevel::INVALID_LEVEL && !NextLevels()) return false;
int val_count = 0;
bool continue_execution = true;
while (val_count < max_values && !RowGroupAtEnd() && continue_execution) {
Tuple* tuple = reinterpret_cast<Tuple*>(tuple_mem + val_count * tuple_size);
if (def_level_ < def_level_of_immediate_repeated_ancestor()) {
// A containing repeated field is empty or NULL
continue_execution = NextLevels();
continue;
}
// Fill in position slot if applicable
if (pos_slot_desc_ != nullptr) {
ReadPositionNonBatched(tuple->GetBigIntSlot(pos_slot_desc()->tuple_offset()));
}
continue_execution = ReadValue(pool, tuple);
++val_count;
if (SHOULD_TRIGGER_COL_READER_DEBUG_ACTION(val_count)) {
continue_execution &= ColReaderDebugAction(&val_count);
}
}
*num_values = val_count;
return continue_execution;
}
bool CollectionColumnReader::ReadNonRepeatedValueBatch(MemPool* pool,
int max_values, int tuple_size, uint8_t* tuple_mem, int* num_values) {
// The below loop requires that NextLevels() was called previously to populate
// 'def_level_' and 'rep_level_'. Ensure it is called at the start of each
// row group.
if (def_level_ == ParquetLevel::INVALID_LEVEL && !NextLevels()) return false;
int val_count = 0;
bool continue_execution = true;
while (val_count < max_values && !RowGroupAtEnd() && continue_execution) {
Tuple* tuple = reinterpret_cast<Tuple*>(tuple_mem + val_count * tuple_size);
continue_execution = ReadNonRepeatedValue(pool, tuple);
++val_count;
if (SHOULD_TRIGGER_COL_READER_DEBUG_ACTION(val_count)) {
continue_execution &= ColReaderDebugAction(&val_count);
}
}
*num_values = val_count;
return continue_execution;
}
bool CollectionColumnReader::ReadSlot(CollectionValue* slot, MemPool* pool) {
DCHECK(!children_.empty());
DCHECK_LE(rep_level_, new_collection_rep_level());
// Recursively read the collection into a new CollectionValue.
*slot = CollectionValue();
CollectionValueBuilder builder(
slot, *slot_desc_->collection_item_descriptor(), pool, parent_->state_);
bool continue_execution =
parent_->AssembleCollection(children_, new_collection_rep_level(), &builder);
if (!continue_execution) return false;
// AssembleCollection() advances child readers, so we don't need to call NextLevels()
UpdateDerivedState();
return true;
}
void CollectionColumnReader::UpdateDerivedState() {
// We don't need to cap our def_level_ at max_def_level(). We always check def_level_
// >= max_def_level() to check if the collection is defined.
// TODO: consider capping def_level_ at max_def_level()
def_level_ = children_[0]->def_level();
rep_level_ = children_[0]->rep_level();
// All children should have been advanced to the beginning of the next collection
for (int i = 0; i < children_.size(); ++i) {
DCHECK_EQ(children_[i]->rep_level(), rep_level_);
if (def_level_ < max_def_level()) {
// Collection not defined
FILE_CHECK_EQ(children_[i]->def_level(), def_level_);
} else {
// Collection is defined
FILE_CHECK_GE(children_[i]->def_level(), max_def_level());
}
}
if (RowGroupAtEnd()) {
// No more values
pos_current_value_ = ParquetLevel::INVALID_POS;
} else if (rep_level_ <= max_rep_level() - 2) {
// Reset position counter if we are at the start of a new parent collection (i.e.,
// the current collection is the first item in a new parent collection).
pos_current_value_ = 0;
}
}
} // namespace impala